2016-10-18 9 views
2

여러 열 GROUPBY에서 최대 값을 행하기 내가PySpark

from pyspark.sql.functions import avg, first 

rdd = sc.parallelize(
[ 
(0, "A", 223,"201603", "PORT"), 
(0, "A", 22,"201602", "PORT"), 
(0, "A", 22,"201603", "PORT"), 
(0, "C", 22,"201605", "PORT"), 
(0, "D", 422,"201601", "DOCK"), 
(0, "D", 422,"201602", "DOCK"), 
(0, "C", 422,"201602", "DOCK"), 
(1,"B", 3213,"201602", "DOCK"), 
(1,"A", 3213,"201602", "DOCK"), 
(1,"C", 3213,"201602", "PORT"), 
(1,"B", 3213,"201601", "PORT"), 
(1,"B", 3213,"201611", "PORT"), 
(1,"B", 3213,"201604", "PORT"), 
(3,"D", 3999,"201601", "PORT"), 
(3,"C", 323,"201602", "PORT"), 
(3,"C", 323,"201602", "PORT"), 
(3,"C", 323,"201605", "DOCK"), 
(3,"A", 323,"201602", "DOCK"), 
(2,"C", 2321,"201601", "DOCK"), 
(2,"A", 2321,"201602", "PORT") 
] 
) 
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"]) 

유사한 dataframe을 가지고 내가 idtype에 의해 집계 및 그룹 당 ship의 가장 높은 발생을 얻을 필요가있다. 예를 들어,

grouped = df_data.groupby('id','type', 'ship').count() 

각 그룹의 횟수와 열이 있습니다

+---+----+----+-----+ 
| id|type|ship|count| 
+---+----+----+-----+ 
| 3| A|DOCK| 1| 
| 0| D|DOCK| 2| 
| 3| C|PORT| 2| 
| 0| A|PORT| 3| 
| 1| A|DOCK| 1| 
| 1| B|PORT| 3| 
| 3| C|DOCK| 1| 
| 3| D|PORT| 1| 
| 1| B|DOCK| 1| 
| 1| C|PORT| 1| 
| 2| C|DOCK| 1| 
| 0| C|PORT| 1| 
| 0| C|DOCK| 1| 
| 2| A|PORT| 1| 
+---+----+----+-----+ 

을하고 난

의 조합을 사용하려고

+---+----+----+-----+ 
| id|type|ship|count| 
+---+----+----+-----+ 
| 0| D|DOCK| 2| 
| 0| A|PORT| 3| 
| 1| A|DOCK| 1| 
| 1| B|PORT| 3| 
| 2| C|DOCK| 1| 
| 2| A|PORT| 1| 
| 3| C|PORT| 2| 
| 3| A|DOCK| 1| 
+---+----+----+-----+ 

을 얻을 필요

grouped.groupby('id', 'type', 'ship')\ 
.agg({'count':'max'}).orderBy('max(count)', ascending=False).\ 
groupby('id', 'type', 'ship').agg({'ship':'first'}) 

하지만 실패합니다. 그룹 카운트에서 최대 행을 얻을 수있는 방법이 있습니까? - 이미 grouped에 고유 한 값을 갖고 있기 때문에 - 결과적으로 중복 요소를 삭제하여 예상 출력을 바탕으로

df_pd = df_data.toPandas() 
df_pd_t = df_pd[df_pd['count'] == df_pd.groupby(['id','type', ])['count'].transform(max)] 
+0

가능한 복제 (http://stackoverflow.com/questions/35218882/find-maximum- :

이를 위해, 우리는 Window 기능을 사용할 수 있습니다 행 그룹당 스파크 - 데이터 프레임) –

+0

표시된 게시물에는 그룹 기준 차원이 하나뿐입니다. 해당 게시물의 세 가지 방법으로 확장하는 방법이 명확하지 않습니다. – Ivan

+0

대답에 영향을 미치지 않습니다. 그냥 파티션이나 그룹에 넣어 더. –

답변

3

, 당신은 idship에 의해 그룹화하는 것 같다 팬더에

이 oneliner는 일을 id, shipcount을 기준으로 type로 정렬. [스파크 DataFrame에서 그룹 당 최대 행을 찾기]의

from pyspark.sql.window import Window 
from pyspark.sql.functions import rank, col 

window = (Window 
      .partitionBy(grouped['id'], 
         grouped['ship']) 
      .orderBy(grouped['count'].desc(), grouped['type'])) 


(grouped 
.select('*', rank() 
     .over(window) 
     .alias('rank')) 
    .filter(col('rank') == 1) 
    .orderBy(col('id')) 
    .dropDuplicates(['id', 'ship', 'count']) 
    .drop('rank') 
    .show()) 
+---+----+----+-----+ 
| id|type|ship|count| 
+---+----+----+-----+ 
| 0| D|DOCK| 2| 
| 0| A|PORT| 3| 
| 1| A|DOCK| 1| 
| 1| B|PORT| 3| 
| 2| C|DOCK| 1| 
| 2| A|PORT| 1| 
| 3| A|DOCK| 1| 
| 3| C|PORT| 2| 
+---+----+----+-----+