2016-06-30 3 views
1

아파치 스파크 내에서 두 개의 서로 다른 RDD 사이의 캔버라 거리를 계산하려고합니다. RDD의 크기는 동일하지만 특별히 큰 것은 아닙니다.아파치 스파크로 캔버라 거리 계산하기

누구나 RDD API에서이를 수행하는 가장 좋은 방법에 대한 제안이 있습니까? 캔버라 거리에 대한 공식은 아래의 링크에서 볼 수 있습니다.

Canberra Distance Between Two Vectors

+0

에 오신 것을 환영합니다 SO에 있습니다. [묻는 방법] (https://stackoverflow.com/help/how-to-ask) 및 [mcve] 생성 방법을 읽으십시오. 많은 사용자가 도움을 줄 준비가되어 있지만 코드 작성 서비스는 아닙니다. 즉, 지금까지의 노력을 보여주고 시도를 포함하십시오. 또한 언어 태그를 포함하는 것을 잊지 마십시오. – zero323

답변

1

각 RDD에 대한 인덱스를 생성 한 다음에 가입해야합니다. 이렇게하면지도를 실행하여 각 쌍의 거리를 계산 한 다음 총 합계를 수집 할 수 있습니다.

다음 작업을해야합니다 :

// I am assuming here that your vectors are initially stored as arrays of Double 
val dataX = sc.parallelize(Array(11.0, 12.0, 13.0, 14.0, 15.0)) 
val dataY = sc.parallelize(Array(21.0, 22.0, 23.0, 24.0, 25.0)) 

def canberraDist(sc: SparkContext, X: RDD[Double], Y: RDD[Double]): Double ={  
    // Create an index based on length for each RDD. 
    // Index is added as second value so use map to switch order allowing join to work properly. 
    // This can be done in the join step, but added here for clarity. 
    val RDDX = X.zipWithIndex().map(x => (x._2,x._1)) 
    val RDDY = Y.zipWithIndex().map(x => (x._2,x._1)) 

    // Join the 2 RDDs on index value. Returns: RDD[(Long, (Double, Double))] 
    val RDDJoined = RDDX.map(x => (x._1,x._2)).join(RDDY.map(x => (x._1,x._2))) 

    // Calculate Canberra Distance 
    val distance = RDDJoined.map{case (id, (x,y)) => { (math.abs(x - y)/(math.abs(x) + math.abs(y))) } }.reduce(_+_) 
    // Return Value 
    return distance 
} 

var totalDist = canberraDist(sc, dataX, dataY) 
println(totalDist) 
+0

감사합니다. 완벽하게 작동했습니다! – RobertFrobisher

관련 문제