2016-12-03 1 views
0
val r1 = sc.parallelize((1 to 100 by 1)).map(i => (i, i % 10)) 
val r2 = for(i <- 0 to 9) yield r1.filter(_._2 == i).repartition(2) 

val tt = r2.map {r => 
    println(s"partitions ${r.getNumPartitions}") 
    val t = r.mapPartitionsWithIndex((i, p) => { 
     val len = p.toList.size 
     p.map(j => (j._1, j._2, i, len)) 
    }) 
    t 
}.reduce(_ union _) 
println (s"total ${tt.count}") 
println(tt.collect().mkString("\n")) 

내에서 기대 외측 두 printlntotal 100을 생산 (10, 0, 1, 5) 정도 같이 tt RDD 내의 튜플을 인쇄 할이었다. 그러나 스파크 셸에서이를 실행하면 total 0 만 제공됩니다. tt RDD가 비어있는 이유는 무엇입니까? 혼란 스럽네. println 내부는 r2에있는 모든 RDD에 대해 partitions 2을 인쇄합니다. Spark 1.6.1을 사용하고 있습니다.예기치 출력 mapPartitionsWithIndex

답변

2

람다 함수에 문제가 있습니다. 파티션 반복기를 목록으로 변환하여 소비하므로, p.map(j => (j._1, j._2, i, len))을 호출 할 때 비어 있습니다! 여기

당신이 그것을 고칠 수있는 방법은 다음과 같습니다

val t = r.mapPartitionsWithIndex((i, p) => { 
    val elements = p.toArray 
    val len = elements.length 
    elements.iterator.map(j => (j._1, j._2, i, len)) 
})