2016-12-09 3 views
0

나는 userid (사용자 ID), day (오늘)의 열을 가진 DataFrame (df)을 가지고 있습니다.pySpark, 집계 복합 함수 (연속 이벤트의 차이)

저는 모든 사용자에 대해 매일 자신이 활성화 된 시간 간격의 평균을 계산하는 데 관심이 있습니다. DataFrame가 팬더 DataFrame 경우

예를 들어, 특정 사용자에 대한 DataFrame 나는 내가 관심이있는 양을 계산할 수있다,이

userid  day  
1   2016-09-18   
1   2016-09-20 
1   2016-09-25  

처럼 보일 수도 있습니다

import numpy as np 
np.mean(np.diff(df[df.userid==1].day)) 
을 같은

그러나 DataFrame에 수백만 명의 사용자가 있기 때문에 이것은 매우 비효율적이지만이 방법으로 수행 할 수 있다고 믿습니다.

df.groupby("userid").agg({"day": lambda x: np.mean(np.diff(x))}) 

첫 번째 문제는 np.mean(np.diff(x))을 적용하기 전에 날짜를 정렬해야하므로 잘 작동하는지 잘 모르겠다는 것입니다.

두 번째 질문은 DataFrame을 Pandas DataFrame으로 변환 할 때만 수행 할 수 있기 때문에 비효율적이라는 것입니다.

pySpark와 똑같은 일을하는 방법이 있습니까?

답변

1

창 기능이 구출됩니다. 일부 수입 :

from pyspark.sql.functions import col, datediff, lag 
from pyspark.sql.window import Window 

창 정의

w = Window().partitionBy("userid").orderBy("day") 

및 쿼리

(df 
    .withColumn("diff", datediff(lag("day", 1).over(w), "day")) 
    .groupBy("userid") 
    .mean("diff"))