2017-11-20 1 views
0

이 질문에 약간의 문제가 있습니다.Pyspark - 'previous'레코드를 찾아 데이터 프레임에 새 열을 추가합니다.

나는 청소를 시도하는 중 많은 양의 이벤트 데이터가 있습니다. 요구 사항 중 하나는 이전 이벤트의 데이터를 포함하는 것이고 pyspark을 사용하여이 작업을 수행하는 적절한 방법을 찾기 위해 고심하고 있습니다.

시도하고 설명하십시오. 내 데이터 프레임을 가정하면 다음과 같다 : 각 레코드에 대한

uid| id|   event_time| event_value| 
---|---|--------------------|------------| 
1 | 1| 2017-11-20 12:00:00|   a| 
2 | 1| 2017-11-20 13:00:00|   b| 
3 | 2| 2017-11-20 12:00:00|   c| 
4 | 2| 2017-11-20 13:00:00|   d| 
5 | 2| 2017-11-20 14:00:00|   e| 

, 나는 같은 ID를 가진 가장 최근의 이전 이벤트를 발견하고 새로운 컬럼으로이를 추가 할 수 있습니다. 즉

uid| id|   event_time| event_value| previous_event_value| 
---|---|--------------------|------------|---------------------| 
1 | 1| 2017-11-20 12:00:00|   a|     null| 
2 | 1| 2017-11-20 13:00:00|   b|     a| 
3 | 2| 2017-11-20 12:00:00|   c|     null| 
4 | 2| 2017-11-20 13:00:00|   d|     c| 
5 | 2| 2017-11-20 14:00:00|   e|     d| 

몇 가지 창 기능을 살펴 보았지만이 사용 사례를 100 % 확신 할 수는 없습니다. 어떤 도움을 주시면 감사하겠습니다.

+1

네,'lag' (윈도우 기능) –

+1

동안 UID 및 EVENT_TIME 상승에 의한 ID 및 주문에 의해 paritionin – philantrovert

+0

감사를 모두 사용합니다. 나는 지금 일하고있는 것처럼 보이는이 것을 가지고있다 ... 'df = df.withColumn ("previous_event_value", lag (df.event_value) .over (Window.partitionBy ("id"). orderBy ("event_time ")))') – robarthur1

답변

0

다른 누군가를 넣으면이 문제가 발생합니다. 위의 제안은 완벽하게 작동했습니다. 지연 창 기능 사용. 예 :

df = df.withColumn("previous_event_value", 
    lag(df.event_value).over(Window.partitionBy("id").orderBy("e‌​vent_time"))) 
관련 문제