(또는 추가를 진행하기 위해 몇 가지 아이디어를 제공) : 당신에게 현재의 코드를 바꿉니다 문자열 :
from pyspark.sql.functions import collect_list
import pandas as pd
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd = sc.parallelize(a)
df = rdd.map(lambda x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"])
df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
#[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']]
UPDATE (자신의 대답 후) :
나는 정말로 위에서 언급 한 요점이 당신의 필요에 따라 더 나아질만큼 충분하다고 생각했다. 그 순간 나는 나 자신이 그것을 할 시간이 없었다.
df = rdd.map(lambda x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"])
# temp list:
ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
ff
# [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']]
# final list of lists:
ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]]
ll
당신의 처음 요구 된 결과를 제공합니다 :
[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]] # requested output
을 그래서, 여기가 (괄호 없애 내
df
정의를 수정 한 후, 단일 지능형리스트의 문제입니다)입니다
이 방법은 특정 장점이 있습니다 자신의 대답에 제공하는 것과 비교 : 그것은 Pyspark UDF를 방지
- , 이는 모든 처리 대신 초기 (아마도 훨씬 더 큰)의 데이터에지도 기능 및 UDF를 첨가하여 열을 제거하고, 수행 중, 응집 최종 (희망 훨씬 적은) 데이터로 이루어진다 known to be slow
- 있다
"나는 목록을 수집 할 수 없다"는 것은 무엇을 의미합니까? – eliasah
collect_list 함수가 목록을 수신 할 수 없습니다. 목록 목록을 수집하려고합니다. –
어떤 spark 버전을 사용하고 있습니까? – eliasah