2017-10-03 1 views
2
여기

내 문제입니다 : 나는이 RDD있어 : 나는 다음 시도여러 열로 그룹화하고 PySpark에서 목록으로 수집하는 방법?

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] 

rdd= sc.parallelize (a) 

을 :

rdd.map(lambda x: (x[0],x[1],x[2], list(x[3:]))) 

.toDF(["col1","col2","col3","col4"]) 

.groupBy("col1","col2","col3") 

.agg(collect_list("col4")).show 

마지막으로 나는이를 찾아야한다 :

[col1,col2,col3,col4]=[u'PNR1',u'TKT1',u'TEST',[[u'a2',u'a3'][u'a5',u'a6'][u'a8',u'a9']]] 

하지만 문제는 목록을 수집 할 수 없다는 것입니다. 누군가가 나를 도울 수 있다면 나는 그것을

+0

"나는 목록을 수집 할 수 없다"는 것은 무엇을 의미합니까? – eliasah

+0

collect_list 함수가 목록을 수신 할 수 없습니다. 목록 목록을 수집하려고합니다. –

+0

어떤 spark 버전을 사용하고 있습니까? – eliasah

답변

0

당신이 당신의 유일한 옵션을 2.x으로 업데이트 할 수 없습니다 때문에 주셔서 감사합니다

RDD API입니다.

하나의 아이디어가 당신의 col4 원시 데이터 형식, 즉 변환하는 것입니다 ... 이것은 당신의 일을 할 수

rdd.map(lambda x: ((x[0], x[1], x[2]), list(x[3:]))).groupByKey().toDF() 
1

(또는 추가를 진행하기 위해 몇 가지 아이디어를 제공) : 당신에게 현재의 코드를 바꿉니다 문자열 :

from pyspark.sql.functions import collect_list 
import pandas as pd 

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] 
rdd = sc.parallelize(a) 

df = rdd.map(lambda x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"]) 

df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0] 
#[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']] 

UPDATE (자신의 대답 후) :

나는 정말로 위에서 언급 한 요점이 당신의 필요에 따라 더 나아질만큼 충분하다고 생각했다. 그 순간 나는 나 자신이 그것을 할 시간이 없었다.

df = rdd.map(lambda x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"]) 

# temp list: 
ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0] 
ff 
# [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']] 

# final list of lists: 
ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]] 
ll 

당신의 처음 요구 된 결과를 제공합니다 :

[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]] # requested output 
을 그래서, 여기가 (괄호 없애 내 df 정의를 수정 한 후, 단일 지능형리스트의 문제입니다)입니다

이 방법은 특정 장점이 있습니다 자신의 대답에 제공하는 것과 비교 : 그것은 Pyspark UDF를 방지

  • , 이는 모든 처리 대신 초기 (아마도 훨씬 더 큰)의 데이터에지도 기능 및 UDF를 첨가하여 열을 제거하고, 수행 중, 응집 최종 (희망 훨씬 적은) 데이터로 이루어진다 known to be slow
  • 있다
+0

사실 나는 col4에 목록의 목록이 필요합니다. 예를 들어 문자열 형식 (a2 a3)을 입력 했으므로 [[a2, a3], [a5, a6], [a8 , a9]] –

+0

@CarlosLopezSobrino 업데이트보기 – desertnaut

+0

@CarlosLopezSobrino가 정확히 무엇을 요청했는지 업데이트 된 대답이 아닙니까? – desertnaut

1

나는 마지막으로 ...

from pyspark.sql.functions import udf 
from pyspark.sql.functions import * 

def example(lista): 
    d = [[] for x in range(len(lista))] 
    for index, elem in enumerate(lista): 
     d[index] = elem.split("@") 
    return d 
example_udf = udf(example, LongType()) 

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] 

rdd= sc.parallelize (a) 

df = rdd.toDF(["col1","col2","col3","col4","col5"]) 

df2=df.withColumn('col6', concat(col('col4'),lit('@'),col('col5'))).drop(col("col4")).drop(col("col5")).groupBy([col("col1"),col("col2"),col("col3")]).agg(collect_set(col("col6")).alias("col6")) 

df2.map(lambda x: (x[0],x[1],x[2],example(x[3]))).collect() 

이 가장 좋은 방법은 아니지만 내가 작업을 계속할 수 있습니다, 해결책을 발견하고는 제공 :

[(u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']])] 

이 솔루션이 도움이되기를 바랍니다.

답장을 보내 주셔서 감사합니다.

+0

pls 내 업데이트 된 답변보기 – desertnaut

관련 문제