2016-08-05 5 views
6

RDD과 같은 성능을 원하지만 reduce과 같은 성능을 원하지만 연산자를 교환 할 필요는 없습니다. 즉, 나는 result을 다음과 같이 항상 "123456789"이되도록하고 싶습니다.RDD에 조치가 있습니까?

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24 

scala> val result = rdd.someAction{ _+_ } 

우선, fold을 찾았습니다. RDD#fold의 문서를 말한다

DEF (zeroValue : T) 배하여 모든 파티션 요소마다 분할하고, 그 결과를 T 골재 (OP (T, T)의 T를 ⇒) 주어진 연관 함수 중성 "영점"다큐먼트에 필요하지 가환 없다고

참고. 그러나, 결과는 예상과 :

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res22: String = 341276895 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res23: String = 914856273 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res24: String = 742539618 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res25: String = 271468359 
+0

당신이보고있는 내용을 설명하는 문서의 다음 섹션을 놓쳤습니다. * "이것은 스칼라 같은 함수 언어의 비 분산 컬렉션에 대해 구현 된 fold 연산과는 다소 다르게 작동합니다. 분할 영역을 개별적으로 분할 한 다음 정의 된 순서에 따라 각 요소에 순차적으로 접기를 적용하지 않고 결과를 최종 결과로 접을 수 있습니다. 교환 가능하지 않은 함수의 경우 결과는 비 분산 컬렉션에 적용된 접기의 결과와 다를 수 있습니다 "* –

답변

2

있다, 이것은 원래 RDD 일에만 파티션을 병합 고려 설명하기 위해 어떤 스칼라에서이 기준을 만족 활동을 감소 내장,하지만 당신은 쉽게 mapPartitions, collect 및 지방 감소를 결합하여 자신을 구현할 수 있습니다 :

import scala.reflect.ClassTag 

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = { 
    rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f) 
} 

fold 의해 사용 병합 대신 비동기 및 정렬되지 않은 방법에 대해 collectreduce 및 조합을 사용하여 글로벌 순서가 보존되는 것을 보장한다. 물론

이 포함 몇 가지 추가 비용으로 제공 :

  • 약간 더 높은 메모리 풋 프린트 드라이버에.
  • 지연 시간이 훨씬 더 길다. 로컬 감소를 시작하기 전에 모든 작업이 완료 될 때까지 기다린다.
+0

도움을 주셔서 감사합니다. 즉, 모든 파티션 **은 항상 전체 RDD의 연속적인 하위 시퀀스 **입니다. 언급 된 문서가 있습니까? – Eastsun

+0

문서 관련 - 내가 아는 것도 없음. 그것은 다소 다소 주문한 방법의 모델과 계약에 의해 다소 제약을받습니다. Spark의 실제 문제는 전체 시퀀스를 결정하는 방법입니다. 일반적으로 순서에 대해 추론 할 때 두 가지 경우가 있습니다. 명시 적 정렬 (계약에 따라)을 사용할 때 b) 결정적 정렬을 생성하고 입력과 현재 점 사이에 임의의 데이터 이동이없는 입력이있는 경우. – zero323

1

으로는 순서를 보존하지 않습니다 @YuvalItzchakov fold 지적 :

scala> rdd.fold(""){ _+_ } 
res10: String = 312456879 

EDIT @의 dk14에서 언급 한 바와 같이 나는 운없이, 시도 결과를 조합 할 때 파티션이 RDD 인 경우

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1) 
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res4: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res5: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res6: String = 123456789 
+0

이 작업을 수행하면 계산의 병렬 처리 능력이 완전히 손실된다는 단점이 있습니다. –

+0

@YuvalItzchakov definite; 'fold '를 사용하면 파티션 된'RDD'에서 순서가 유지되지 않을 수 있습니다. – elm

+0

네, 이해합니다. 그러나 OP는 그것을 알고 있어야합니다. –

관련 문제