2016-11-21 2 views
2

에서 사용자에 대한 각 항목의 각 행을 생성 I는 다음과 같이 스파크 데이터 프레임을 가지고, 각각의 사용자들이 구매 한 아이템에 대한 행을 갖는스파크 dataframe

User Item Purchased 
1 A 1 
1 B 2 
2 A 3 
2 C 4 
3 A 3 
3 B 2 
3 D 6 

only showing top 5 rows 

. Purhcased가 얼마나 많은 수량인지 가정합니다. 구매 (카운트).

그러나 특정 사용자가 행을 갖고 있지 않은 항목에 대해 사용자가 구매하지 않았을 수있는 항목이 있습니다. 사용자가 구매 한 항목에 대한 행만 있습니다. 따라서 사용자 1이 항목 A, B를 구입 한 경우이 두 항목에 해당하는 사용자 1에 대해 2 개의 행이 있습니다. 그러나 사용자 2가 A, C를 구입 한 경우 사용자 2에게는 항목 A와 C에 대한 행이 있지만 B에는없는 행이 있습니다. 결국 각 사용자는 표의 모든 항목에 대한 모든 행과 해당 항목 수를 갖고 있어야합니다.

이 데이터 프레임을 위의 데이터 프레임으로 변환하고 사용자가 보지 못했던 항목에 대한 행을 가지고 해당 카운트를 0으로 만들고 싶습니다.

아래와 같은 생각

User Item Purchased 
1 A 1 
1 B 2 
1 C 0 
1 D 0 
2 A 3 
2 C 4 
2 B 0 
2 D 0 
3 A 3 
3 B 2 
3 D 6 
3 C 0 
only showing top 5 rows 

한가지 방법은 불꽃에서 그때 I 대응 값 열의 각 행을 변환 할 수있는 첫 번째 데이터 프레임의는 SqlContext의 cross_tab 방법을 사용하는 경우이었다. 사용자가 가지고 있지 않은 항목의 경우 동일한 항목을 만들고 거기에 0을 넣습니다.

하지만 어떻게 그 열을 다시 행으로 변환합니까?. 그것은 또한 원형 교차로 일 수 있습니다.

감사

답변

1
df = sqlContext.createDataFrame([(1, 'A', 2), (1, 'B', 3), (2, 'A', 2)], ['user', 'item', 'purchased']) 
pivot = df.groupBy('user').pivot('item').sum('purchased').fillna(0) 
items = [i['item'] for i in df.select('item').distinct().collect()] 
flattened_rdd = pivot.rdd.flatMap(lambda x: [(x['user'], i, x[i]) for i in items]) 
sqlContext.createDataFrame(flattened_rdd, ["user", "item", "purchased"]).show() 
+0

purchase_count 변수가 여기에 무엇입니까? 또한 평균을 취하는 이유는 무엇입니까? 나는 데이터에있는 그대로 실제 카운트를 원한다. – Baktaawar

+0

이 코드는 오랜 시간이 걸린다. 나는 노트북 스파크에서 작동하지만 시간이 많이 걸리는 – Baktaawar

+0

@ Baktaawar는 purchase_count로 구입했습니다. 그것을 반영하도록 내 코드를 수정했습니다. 왜 그렇게 오래 걸릴지 모르겠다. 4 초 안에 실행 된 1249956 행의 노트북에서 실행했습니다. 다음은 샘플 입력을 작성하는 코드입니다. 범위 (0, mod)의 j에 대해 영문자 = 목록 (string.uppercase) 범위 (1,100000) : mod = i % len (알파벳) output.append (i, alphabets [j], 1))' – None

1

우리는 단지뿐만 아니라 단지 df 함수를 사용하여이 작업을 수행 할 수 있습니다.

orders = [(1,"A",1),(1,"B",2),(2,"A",3),(2,"C",4),(3,"A",3),(3,"B",2),(3,"D",6)] 
df = sqlContext.createDataFrame(orders, ["user","item","purchased"]) 
df_items = df.select("item").distinct().repartition(5).withColumnRenamed("item", "item_1") 
df_users = df.select("user").distinct().repartition(5).withColumnRenamed("user", "user_1") 
df_cartesian = df_users.join(df_items) 
//above expression returns cartesian product of users and items dfs 
joined_df = df_cartesian.join(df, [df_cartesian.user_1==df.user, df_cartesian.item_1==df.item], "left_outer").drop("user").drop("item") 
result_df = joined_df.fillna(0,["purchased"]).withColumnRenamed("item_1", "item").withColumnRenamed("user_1", "user") 

마지막으로, result_df.show() 욕망의 출력은 다음과 같이 생성합니다

+----+----+---------+ 
|user|item|purchased| 
+----+----+---------+ 
| 2| A|  3| 
| 2| B|  0| 
| 2| C|  4| 
| 2| D|  0| 
| 3| A|  3| 
| 3| B|  2| 
| 3| C|  0| 
| 3| D|  6| 
| 1| A|  1| 
| 1| B|  2| 
| 1| C|  0| 
| 1| D|  0| 
+----+----+---------+ 
+0

파티션을 다시 분할해야합니까? – Baktaawar

+0

예, 그렇지 않으면'df_users.join (df_items)'는'200 * 200' 작업을 생성합니다. 여기서 200은 기본 파티션 수입니다. – avr

+0

문제는 실행되고 있지 않습니다. 12 시간 이상 달리고 심지어 아무런 결과도 얻지 못합니다. 나는 300K obs를 가지고있다. – Baktaawar

관련 문제