0

Apache Spark 작업을 실행할 때 직면 한 문제 중 하나는 RDD의 각 요소를 서로 곱하는 것입니다. 단순히 Spark RDD의 요소에 서로 곱하기

enter image description here

는 현재, 나는 각 'foreach는'2 반복자를 사용하여이 일을 해요, 나는이 비슷한을하고자했습니다. 내 직감은 이것이 매우 효율적인 방식으로 수행 될 수 있다는 것입니다.

for (elementOutSide <- iteratorA) { 
    for (elementInside <- iteratorB) { 
    if (!elementOutSide.get(3).equals(elementInside.get(3))) { 
     val multemp = elementInside.getLong(3) * elementOutSide.getLong(3) 
     .... 
     ... 

}}} 

상황을 개선하고 개선하는 데 도움을 줄 수있는 사람이 누구입니까? 미리 감사드립니다 .. !!

+0

나는 당신이 정규 직교 조인을 찾고 있다고 생각한다. – Alec

+1

사실, 귀하의 구현은 실제로 요구 사항에 맞지 않습니다. 원래 RDD의 레코드가 _unique_ 인 경우에만 작동하는 _indices_가 아닌 실제 _element를 비교합니다. –

+0

그들은 고유합니다. RDD는이를 보장하는 SQL 쿼리를 사용하여 구성됩니다. – Infamous

답변

1

주석으로 지적했듯이, 이것은 데카르트 조인입니다. 여기가 우리가 매 2 동일하지 않은 Int의의 곱셈에 관심이 RDD[(Int, String)]에 할 수있는 방법은 다음과 같습니다

val rdd: RDD[(Int, String)] = sc.parallelize(Seq(
    (1, "aa"), 
    (2, "ab"), 
    (3, "ac") 
)) 

// use "cartesian", then "collect" to map only relevant results 
val result: RDD[Int] = rdd.cartesian(rdd).collect { 
    case ((t1: Int, _), (t2: Int, _)) if t1 != t2 => t1 * t2 
} 

참고 :이 구현은 지시대로 입력 기록, 고유 가정합니다. 그렇지 않은 경우에는 값 대신 인덱스를 비교하면서 rdd.zipWithIndex의 결과에 대한 데카르트 조인과 맵핑을 수행 할 수 있습니다.