2017-05-16 1 views
-3

스파크가 새로 생겼고 다른 모든 rdd에 나타나는 항목이 포함 된 최종 rdd를 필터링하려고합니다.모든 rdd에 나타나는 항목 가져 오기 - Pyspark

내 코드

a = ['rs1','rs2','rs3','rs4','rs5'] 
b = ['rs3','rs7','rs10','rs4','rs6'] 
c = ['rs10','rs13','rs20','rs16','rs1'] 
d = ['rs2', 'rs4', 'rs5', 'rs13', 'rs3'] 

a_rdd = spark.parallelize(a) 
b_rdd = spark.parallelize(b) 
c_rdd = spark.parallelize(c) 
d_rdd = spark.parallelize(d) 

rdd = spark.union([a_rdd, b_rdd, c_rdd, d_rdd]).distinct() 

결과 : [ 'RS4', 'RS16', 'RS5', 'RS6', 'RS7', 'RS20', 'RS1', 'rs13', 'RS10 ','RS2 ','RS3 ']

내 예상 된 결과가 [입니다'RS3 ','RS4 ']

감사합니다!

+0

설명서에 대해 더 자세히 읽어 보시기 바랍니다. https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.distinct. 내부 조인을 확인하십시오. –

+0

내 잘못, 그 API 문서 페이지를 찾지 못했습니다, 그것에 더 많은 시간을 보냅니다 감사합니다 – pthphap

답변

1

모든 rdd의 항목이 포함 된 rdd를 원한다고 말하면 교차를 의미합니까? 그건 당신이 노조와 rdds의 교회법을 사용하지 말아야하는 경우가 비어 있다면

(어떤 요소는 4 rdds에서 반복되지 않습니다)하지만 당신은 rdds의 교차 할 필요가있는 경우 :

def intersection(*args): 
     return reduce(lambda x,y:x.intersection(y),args) 

    a = ['rs1','rs2','rs3','rs4','rs5'] 
    b = ['rs3','rs7','rs1','rs2','rs6'] 
    c = ['rs10','rs13','rs2','rs16','rs1'] 
    d = ['rs2', 'rs4', 'rs1', 'rs13', 'rs3'] 

    a_rdd = sc.parallelize(a) 
    b_rdd = sc.parallelize(b) 
    c_rdd = sc.parallelize(c) 
    d_rdd = sc.parallelize(d) 

    rdd = sc.union([a_rdd, b_rdd, c_rdd, d_rdd]).distinct() 
    intersection(a_rdd, b_rdd, c_rdd, d_rdd).collect() 

출력이 [ 'rs1', 'rs2']

+0

나는'reduce'에 다음과 같이 추가 할 수있는 제안을했습니다 :'reduce (RDD.intersection, args)' –

+1

ah 예 그것은 더 우아한 방법입니다 :) –

+0

이것은 매력처럼 작동합니다. 고맙습니다 – pthphap

관련 문제