2017-05-17 1 views
0

같은 키와 다른 값을 가진 두 개의 RDD가 있습니다. 나는 둘 다에 .partitionBy(partitioner) 같은 전화를 한 후 나는 그들 가입 : 내가 얻을동일한 키를 가진 두 개의 RDD에 적용된 동일한 HashPartitioner가 균등하게 분할되지 않습니다.

val partitioner = new HashPartitioner(partitions = 4) 

val a = spark.sparkContext.makeRDD(Seq(
    (1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E"), (6, "F"), (7, "G"), (8, "H") 
)).partitionBy(partitioner) 

val b = spark.sparkContext.makeRDD(Seq(
    (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f"), (7, "g"), (8, "h") 
)).partitionBy(partitioner) 

println("A:") 
a.foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

println("B:") 
b.foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

println("Join:") 
a.join(b, partitioner) 
    .foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

을 : joinRDD는 A와 B 파티션이 다르고 왜 왜

A: 
(2,B) (3,C) (4,D) (6,F) (7,G) 
(8,H) (1,A) 
(5,E) 

B: 
(3,c) (7,g) 
(1,a) (5,e) 
(2,b) (6,f) 
(4,d) (8,h) 

Join: 
(6,(F,f)) (1,(A,a)) (2,(B,b)) (5,(E,e)) (4,(D,d)) (8,(H,h)) 
(3,(C,c)) (7,(G,g)) 

그래서 첫 번째 질문입니다 둘 다 다른가?

답변

1

분할은 모든 경우에서 동일합니다. 문제는 사용하는 방법입니다. 각 파티션은 별도의 스레드에서 처리된다는 점을 기억하십시오. 이 코드를 여러 번 실행하면 결과가 실제로 비 결정적임을 알 수 있습니다. 각 파티션의 값의 순서가 여전히 경우 비 결정적 될 수 있음을

a.glom.collect.map(_.mkString(" ")).foreach(println) 
(4,D) (8,H) 
(1,A) (5,E) 
(2,B) (6,F) 
(3,C) (7,G) 
b.glom.collect.map(_.mkString(" ")).foreach(println) 
(4,d) (8,h) 
(1,a) (5,e) 
(2,b) (6,f) 
(3,c) (7,g) 
a.join(b).glom.collect.map(_.mkString(" ")).foreach(println) 
(4,(D,d)) (8,(H,h)) 
(1,(A,a)) (5,(E,e)) 
(6,(F,f)) (2,(B,b)) 
(3,(C,c)) (7,(G,g)) 

주 : 대신에이 같은 예를 들어 뭔가

시도 비 local 컨텍스트에서 실행되지만 각 파티션의 내용은 l 위에 표시된 것과 같아야합니다.

관련 문제