2016-11-22 1 views
3

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html 스파크를 피봇 작동하는 방법을 정교하게 설명합니다. 내 파이썬 코드에서집진없이 스파크 피봇

, 나는 인덱스와 조인을 집계하지 않고 팬더를 사용하지만 재설정 :

pd.pivot_table(data=dfCountries, index=['A'], columns=['B']) 
countryToMerge.index.name = 'ISO' 
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner') 

어떻게 불꽃이 작동합니까?

나는 그룹에 노력하고 수동으로 같은 조인 :

val grouped = countryKPI.groupBy("A").pivot("B") 
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show 

을하지만 작동하지 않습니다. reset_index는 spark에 어떻게 적합할까요?/spark native 방식으로 어떻게 구현 될까요?

편집

파이썬 코드의 최소한의 예 :

import pandas as pd 
from datetime import datetime, timedelta 
import numpy as np 
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"]) 
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO']) 
dates['ISO'] = isos.ISO 
dates['ISO'] = dates['ISO'].astype("category") 
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'], 
         'indicator_id':['a','a','b','b'], 
         'value':[7,8,9,7]}) 
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id']) 
countryToMerge.index.name = 'ISO' 
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')) 

    dates ISO a b 
0 2016-01-01 ABC 7 9 
1 2016-01-03 ABC 7 9 
2 2016-01-05 ABC 7 9 
3 2016-01-07 ABC 7 9 
4 2016-01-09 ABC 7 9 
5 2016-01-02 POL 8 7 
6 2016-01-04 POL 8 7 
7 2016-01-06 POL 8 7 
8 2016-01-08 POL 8 7 
9 2016-01-10 POL 8 7 

스칼라에 함께 따라하기/스파크

val dates = Seq(("2016-01-01", "ABC"), 
    ("2016-01-02", "ABC"), 
    ("2016-01-03", "POL"), 
    ("2016-01-04", "ABC"), 
    ("2016-01-05", "POL"), 
    ("2016-01-06", "ABC"), 
    ("2016-01-07", "POL"), 
    ("2016-01-08", "ABC"), 
    ("2016-01-09", "POL"), 
    ("2016-01-10", "ABC") 
).toDF("dates", "ISO") 
    .withColumn("dates", 'dates.cast("Date")) 

    dates.show 
    dates.printSchema 

    val countryKPI = Seq(("ABC", "a", 7), 
    ("ABC", "b", 8), 
    ("POL", "a", 9), 
    ("POL", "b", 7) 
).toDF("country_id3", "indicator_id", "value") 

    countryKPI.show 
    countryKPI.printSchema 

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id") 

답변

0

다음 코드는 작동하는 것 같다 -하지만 난 확실하지 않다 avg에 의한 집계가 정확하다면 - "fitting numbers"가 결과물이다.

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show 

나는 (I 집계하지 않는 한)이 단지 값을 재사용에 비해 데이터의 더 큰 금액 (평균)은 "비효율적"입니다 있는지 확실하지 않습니다.

+0

심지어 집계가없는 피벗 기능을 찾고 있습니다. @Georg Heiler, 그 동안 뭔가 발견 했습니까? – user3560220

+0

여기에 게시 된 내용 만. 운수 나쁘게. –