2016-11-26 3 views
0

동일한 키를 공유하지 않더라도 다른 RDD를 기준으로 한 RDD의 요소를 필터링하는 방법이 있습니까?다른 RDD를 기반으로 하나의 RDD를 프룬합니다

내가 두 RDDs이 - ABC와 XYZ

abc.collect()이

[a,b,c] 

모양)이

[[a,b,c],[a,c,g,e],[a,e,b,x],[b,c]] 

xyz.collect (처럼 보이는 지금은 원하는 xyz에없는 RDD abc의 모든 요소를 ​​필터링합니다.

상기 수술 후, RDD ABC 방송은 다음과 같아야합니다

: 날이 오류를 제공하는, 그러나

def prune(eachlist): 
    for i in eachlist: 
     if i in xyz: 
      return i 

abc = abc.map(prune) 

을 :

[[a,b,c],[a,c],[a,b],[b,c]] 

내가 이렇게 생긴 코드를 작성

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation 

나는 필터를 사용하지 않았으므로 대신지도를 찾아 보았습니다. 같은 오류가 계속 발생합니다.

xyz에서 수집 작업을 수행하고이 오류를 해결할 수 있음을 알고 있지만 큰 데이터 집합에서이를 실행하고 .collect()가 너무 많은 메모리를 초과하여 AWS 서버를 종료합니다. 따라서 .collect() 또는 이렇게 비싼 작업을 사용하지 않고이 작업을 수행해야합니다. 당신이 할 수있는

답변

2

:

# Add index 
abc.zipWithIndex() \ 
    # Flatten values 
    .flatMap(lambda x: [(k, x[1]) for k in x[0]]) \ 
    # Join with xyz (works as filter) 
    .join(xyz.map(lambda x: (x, None))) \ 
    # Group back by index 
    .map(lambda x: (x[1][0], x[0])) \ 
    .groupByKey() \ 
    .map(lambda x: list(x[1])) 

또는 꽃이 xyz 필터링 및 abc을지도하는 데 사용할 만들 수 있습니다.

+0

이 시도했지만 unhashable 형식 목록 오류로 끝났다 – Piyush

+0

다시 시도하십시오. 이 단계별로 단계별로 진행되었습니다. 왜 그것이 처음으로 작동하지 않았는지 모르겠습니다. 감사 LostinOverflow – Piyush

관련 문제