2017-11-13 1 views
0

두 개의 PySpark DataFrames가 있습니다.DataFrames에 가입하고 타임 스탬프로 최신 행을 가져 오는 방법은 무엇입니까?

df = df1.join(df2,['col1', 'col2'], 'inner') 

DataFrame df2는 열 timestamp을 가지고 df1 동안이없고 :

df1 = 
col1 col2 
AA  11 
BB  22 

df2 = 
timestamp col1 col2 col3 
1510586134 AA  11 3 
1510586140 AA  11 2 
1510586200 AA  11 5 
1510586134 BB  22 3 

어떻게 timestamp에 따라 df2의 새로운 행에 의해 DataFrames에 가입하실 수 있습니다 다음과 같이 둘 다 DataFrames 가입 ?

결과는 다음과 같아야합니다

col1 col2 col3 
AA  11  5 
BB  22  3 
+0

사용 윈도우 함수. window.partitionBy ("col1", "col2"). orderBy ("timestamp")'그런 다음 결과 데이터 프레임을'col1, col2'를 키로 결합하십시오. – philantrovert

+0

@philantrovert : .orderBy (col ("timestam"). desc()? 가장 최근의 것을 가져 오기 위해'orderBy' 다음에'where'를 사용해야합니까? – Dinosaurius

답변

1

희망이 도움이!

from pyspark.sql.functions import col, rank 
from pyspark.sql.window import Window 

#sample data 
df1 = sc.parallelize([ 
    ['AA', 11], 
    ['BB', 22] 
]).toDF(('col1', 'col2')) 
df2 = sc.parallelize([ 
    [1510586134, 'AA', 11, 3], 
    [1510586140, 'AA', 11, 2], 
    [1510586200, 'AA', 11, 5], 
    [1510586134, 'BB', 22, 3] 
]).toDF(('timestamp', 'col1', 'col2', 'col3')) 

#select latest row of df2 according to timestamp 
df2_temp = df2.withColumn('timestamp_format_col', col('timestamp').cast("timestamp")) 
window = Window.partitionBy('col1','col2').\ 
    orderBy(col('timestamp_format_col').desc()) 
df2_temp = df2_temp.\ 
    select('*', rank().over(window).alias('rank')).\ 
    filter(col('rank')==1).\ 
    drop('rank','timestamp','timestamp_format_col') 

#final result 
df = df1.join(df2_temp, ['col1', 'col2'], 'inner') 
df.show() 

출력은 다음과 같습니다 최신 행과 게타 dataframe에

+----+----+----+ 
|col1|col2|col3| 
+----+----+----+ 
| BB| 22| 3| 
| AA| 11| 5| 
+----+----+----+ 
+0

@Dinosaurius는 도움이되는지 알려주지 않습니다. 당신은 당신의 문제를 해결합니다 :) – Prem

관련 문제