2015-01-30 3 views
3

HBase 테이블에 연결된 RDD가 있습니다. 각 행 (키)은 GPS 위치를 나타냅니다. 이제는 두 점 사이의 거리를 계산하는 함수를 작성했습니다. 이 함수는 현재 행과 그 전임자와 함께 호출되어야합니다. [i-1]순차적 RDD 처리의 기능적 접근 [Apache Spark]

이제 RDD 함수를 사용하여 기능적으로이 작업을 수행하여 병렬화 할 수 있습니다.

내 신속하고 더러운 방법은 첫째 작동 거리

//create a parallel-enabled data set 
    val parallelDataSet = sc.parallelize(rows) 

    parallelDataSet.foreach(row => {  
    Functions.logDistance(row) 
}) 

을 그럼 난 배열을 병렬화 것이다 배열을

val rows = rdd.collect() 
val rowCount = rdd.count() - 1 //since the first row has no distance 
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int]) 
var i = 0 //can be better solved in scala, I know ;) 

rows.foreach(row => { 
    if (predecssorPoint == null) { 
    predecssorPoint = getPointByRow(row._2) 
    } 
    else { 
    currentPoint = getPointByRow(row._2) 
    rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint) 

    i += 1 
    predecssorPoint = currentPoint 
    } 
}) 

return rowArray 

를 작성하고 계산하는 것입니다 그러나 그것은 추악한 확실히 비효율적입니다.

내 아이디어는 rdd.reduce()를 사용하여 foreach 루프를 없애고 거리 함수가 (a + b)의 순서가 보장되지 않는다는 문제를 처리하는 경우 작동 할 수 있다는 것을 알고 있습니다.

어쨌든 더 나은 해결책이 있습니까? 필자가 알고있는 점은 RDD로 작업 할 때 (효율적으로) 색인 액세스 할 수있는 가능성이 없다는 것입니다.

감사합니다.

답변

2

여기에서 주문이 중요하다는 점을 감안할 때, 진행하는 좋은 방법은 먼저 RDD를 색인하는 것입니다. 그런 다음 인덱스를 사용하여 zip을 시뮬레이트하고 클러스터를 통해 튜플을 분할 할 수 있습니다. 이런 식으로 뭔가 :

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)} 
val joined = indexed.join(shifted) 
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)} 

(*) 예제 코드 -

을 테스트하지
관련 문제