2015-01-05 2 views
2

아래의 코드로 두 개의 RDD로 데이터를 인쇄 할 수 있습니다.두 개의 RDD 데이터를 스파크로 비교

usersRDD.foreach(println) 
empRDD.foreach(println) 

두 RDD의 데이터를 비교해야합니다. 한 RDD의 필드 데이터를 다른 RDD의 필드 데이터와 반복 및 비교할 수 있습니까? 예 : 레코드를 반복하고 userRDD의 이름과 나이가 일치하는 레코드가 empRDD에 있는지 확인합니다 (별도 RDD에 입력하지 않은 경우).

나는 userRDD.substract(empRDD)으로 시도했지만 모든 필드를 비교했습니다.

답변

5

레코드를 조인 할 항목이 있도록 각 RDD의 데이터를 키해야합니다. 예를 들어 groupBy을보십시오. 그런 다음 join 결과 RDD. 각 키에 대해 두 값 모두에서 일치하는 값을 가져옵니다. 당신은 타의 추종을 불허하는 키를 찾는 데 관심이 있다면,이 같은 leftOuterJoin를 사용 : 물론

// Returns the entries in userRDD that have no corresponding key in empRDD. 
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = { 
    userRDD.leftOuterJoin(empRDD).collect { 
    case (name, (age, None)) => name -> age 
    } 
} 
+1

OP와 같은 소리는 한 RDD에 있고 다른 LDD에없는 키를 찾는 데 관심이있을 수 있습니다. 이를 위해서는'join' 대신'leftOuterJoin'이 필요합니다. 대답에 언급 할 가치가 있습니다. –

+0

감사합니다 션,하지만 두 RDD에서 일치하지 않는 데이터를 찾아야합니다. 샘플 코드를 제공하면 좋을 것입니다. – Ramakrishna

+0

감사합니다 @ 대니얼 다라 보스, 맞습니다. 나는 원본 텍스트를 잘못 읽었습니다. –

1

위의 솔루션은 완전하고 정확! RDD가 동기화 된 경우에만 동일한 제안 (동일한 행에 동일한 키가 있음). 당신은 분산 솔루션을 사용하여 다음과 같은 테스트 솔루션을 통해서만 스파크 변환을 사용하여 병렬 처리를 이용할 수 :

def distrCompare(left: RDD[(Int,Int)], right: RDD[(Int,Int)]): Boolean = { 
    val rdd1 = left.join(right).map{case(k, (lv,rv)) => (k,lv-rv)} 
    val rdd2 = rdd1.filter{case(k,v)=>(v!=0)} 
    var equal = true; 
    rdd2.map{ 
    case(k,v)=> if(v!=0) equal = false 
    } 
    return equal 
} 

당신은 "가입"에서 파티션의 수를 선택할 수 있습니다.