2016-06-08 2 views
2

나는 다음과 같은 코드를 가지고 :스파크 RDD : 여러 reducebykey 또는 한 번만

// make a rd according to an id 
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... } 
val data:RDD[(VertexId, Double)] = ... // loading from hdfs 
val idList = (1 to 100) 
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_) 
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_)) 

RST1 및 RST2 샘플 결과를 얻을. 나는 rst1이 더 많은 메모리 (100 번)를 필요로하지만 단지 하나의 reduceByKey tranform을 필요로한다고 생각했다. 그러나 rst2는 메모리가 적지 만 reduceByKey 변환 (99 번)이 더 필요합니다. 그렇다면 시간과 공간의 트레이드 오프 게임입니까?

제 질문은 : 위의 분석 결과가 맞는지 또는 Spark이 내부적으로 같은 방식으로 동작을 처리하는지 여부입니다.

P.S .: rst1 union all sub rdd then reduceByKey, reducedByKey는 외부입니다. rst2 reduceByKey 하나씩, reduceByKey는 안에 있으며은 감소합니다.

+1

나는 귀하의 질문을 이해하고 있는지 잘 모르겠습니다. rst1과 rst2는 동일한 코드를 갖지만 하나는 줄이기 위해 자리 표시자를 사용하고 다른 하나는 줄이지 ​​않습니다. – eliasah

+0

rst1 union all sub rdd 그런 다음 reduceByKey가 ** outside ** reduce 인 reduceByKey입니다. rst2 reduceByKey 하나씩, reduceByKey는 ** 내부 ** 감소입니다. – bourneli

+0

오, 죄송 합니다만, rst2의 reduceByKey도 외부에 있다고 생각했습니다. – eliasah

답변

3

두 해답은 모두 상대적으로 비효율적이지만 두 번째 해답은 첫 번째 해답보다 최악입니다.

마지막 질문에 대답을 시작하겠습니다. 에 셔플을 필요로하지 않는 대신 여러 변환을 결합 완벽한 혈통

  • 을 재 계산 명시 적으로 사용

    • 또는 암시 적으로 캐시 된 작업 결과 : 낮은 수준의 RDD API의 두 (대신) 글로벌 자동 최적화의 유형이있다 단일 ShuffleMapStage

    다른 모든 것들은 거의 DAG를 정의하는 순차적 변환입니다. 이는보다 제한적인 상위 수준 인 Dataset (DataFrame) API와는 달리, 변환에 대한 특정 가정과 실행 계획의 전역 최적화를 수행합니다.

    코드와 관련하여 첫 번째 해결책의 가장 큰 문제점은 반복적 인 적용 union을 적용 할 때 늘어나는 혈통입니다. 장애 복구 비용이 많이 들고, RDD가 재귀 적으로 정의되기 때문에 StackOverflow 예외로 인해 실패 할 수 있습니다. 덜 심각한 부작용은 후속 축소 *에서 보상되지 않는 파티션 수의 증가입니다. 당신은 더 자세한 Stackoverflow due to long RDD Lineage에 대한 내 대답에 설명하지만 당신이 정말로 여기가 필요하다이 같은 단일 union을 확인할 수있는 것들 :

    sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_) 
    

    이 실제로 당신이 진정 기능을 감소 적용 가정 최적의 솔루션입니다.

    두 번째 해결 방법은 분명히 동일한 문제를 겪고 있지만 그럼에도 악화됩니다. 첫 번째 접근 방식은 단일 셔플로 2 단계 만 필요하지만 각 구간마다 셔플이 필요합니다. 파티션 수가 증가하고 기본값 인 HashPartitioner을 사용하면 각 데이터 조각을 디스크에 여러 번 쓰고 네트워크를 통해 여러 번 섞어 야합니다. 낮은 수준의 계산을 무시하면 각 레코드가 잘립니다. O (N) 번 N 여기서 병합 할 RDD 수입니다.

    메모리 사용에 관해서는 데이터 배포에 대해 더 많이 알지 못하더라도 명백하지 않지만 최악의 시나리오에서는 두 번째 방법이 상당히 나쁜 동작을 나타낼 수 있습니다.

    +이 일정한 공간에서 작동하는 경우 축소를위한 유일한 요구 사항은지도 측 결합 결과를 저장하는 해시 맵입니다.파티션은 전체 내용을 메모리로 읽지 않고 데이터 스트림으로 처리되기 때문에 각 작업의 총 메모리 크기는 데이터 양이 아닌 고유 키 수에 비례합니다. 두 번째 방법은 더 많은 작업이 필요하기 때문에 전체 메모리 사용량이 첫 번째 경우보다 높습니다. 평균적으로 데이터가 부분적으로 구성되기 때문에 약간 더 좋을 수 있지만 추가 비용을 보상하기는 쉽지 않습니다.


    * 당신이이 약간 다른 문제가 있지만 몇 가지 아이디어를 줄 것이다 당신이 Spark iteration time increasing exponentially when using join을 볼 수 있습니다 전체 성능에 영향을 미칠 수있는 방법을 배우고 싶은 경우에 왜 파티션 문제의 제어 번호입니다.

  • +2

    sc.union은 명백하게 수행 능력을 향상시키지 않습니다. 그러나 일반적인 optimazation 전략에서 영감을 얻었습니다. 계보 및 shullfe 줄이기, 중간 결과 저장, ** 지속 ** 및 ** 검사 점 ** 신중하게 사용하고 마지막 reduceByKey를 제거하십시오. 성능이 크게 향상되었습니다. 고마워요 @ zero323. 그리고 예를 들어, 엘리사에게 감사드립니다. – bourneli

    관련 문제