RDDs
과 Datasets
사이의 전환을 의미하는 경우 두 질문에 대한 답변이 부정입니다.
RDD 파티셔닝은 RDD[(T, U)]
에 대해서만 정의되며 RDD
이 Dataset
으로 변환 된 후에 손실됩니다. 기존의 데이터 레이아웃에 도움이되는 경우도 있지만 RDDs
및 Datasets
은 각각 다른 해싱 기술 (표준 hashCode
및 MurmurHash
)을 사용합니다. 물론 사용자 정의 분할 자 RDD
을 정의하여 후자를 모방 할 수 있지만 정말 요점이 아닙니다.)
마찬가지로 Dataset
을 RDD
으로 변환하면 분할에 대한 정보가 손실됩니다.
Dataset
파티셔닝을 사용하면 joins
을 최적화하는 데 사용할 수 있습니다. 테이블이 된 경우 예를 들어 사전 분할 다음 key
에 따라 join
val n: Int = ???
val df1 = Seq(
("key1", "val1", "val2"), ("key2", "val3", "val4")
).toDF("key", "val1", "val2").repartition(n, $"key").cache
val df2 = Seq(
("key1", "val5", "val6"), ("key2", "val7", "val8")
).toDF("key", "val3", "val4").repartition(n, $"key").cache
이후 추가 교환을 필요로하지 않습니다.
df2.explain
// == Physical Plan ==
// InMemoryTableScan [key#201, val3#202, val4#203]
// +- InMemoryRelation [key#201, val3#202, val4#203], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#201, 3)
// +- LocalTableScan [key#201, val3#202, val4#203]
//
df1.join(df3, Seq("key")).explain
// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#232, val6#233]
// +- *SortMergeJoin [key#171], [key#231], Inner
// :- *Sort [key#171 ASC], false, 0
// : +- *Filter isnotnull(key#171)
// : +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
// : +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// : +- Exchange hashpartitioning(key#171, 3)
// : +- LocalTableScan [key#171, val1#172, val2#173]
// +- *Sort [key#231 ASC], false, 0
// +- *Filter isnotnull(key#231)
// +- InMemoryTableScan [key#231, val5#232, val6#233], [isnotnull(key#231)]
// +- InMemoryRelation [key#231, val5#232, val6#233], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#231, 3)
// +- LocalTableScan [key#231, val5#232, val6#233]
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1
df1.explain
// == Physical Plan ==
// InMemoryTableScan [key#171, val1#172, val2#173]
// +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#171, 3)
// +- LocalTableScan [key#171, val1#172, val2#173]
은 분명히 우리가 정말 하나의 결합에 그 혜택을하지 않습니다. 따라서 하나의 테이블이 복수
joins
에 사용되는 경우에만 의미가 있습니다.
val df3 = Seq(
("key1", "val9", "val10"), ("key2", "val11", "val12")
).toDF("key", "val5", "val6")
df1.join(df3, Seq("key")).join(df3, Seq("key"))
우리가 첫 번째 작업에 의해 생성 된 구조에서 도움이 될
(ReusedExchange
주의) :
// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#682, val6#683, val5#712, val6#713]
// +- *SortMergeJoin [key#171], [key#711], Inner
// :- *Project [key#171, val1#172, val2#173, val5#682, val6#683]
// : +- *SortMergeJoin [key#171], [key#681], Inner
// : :- *Sort [key#171 ASC], false, 0
// : : +- Exchange hashpartitioning(key#171, 200)
// : : +- *Filter isnotnull(key#171)
// : : +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
// : : +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// : : +- Exchange hashpartitioning(key#171, 3)
// : : +- LocalTableScan [key#171, val1#172, val2#173]
// : +- *Sort [key#681 ASC], false, 0
// : +- Exchange hashpartitioning(key#681, 200)
// : +- *Project [_1#677 AS key#681, _2#678 AS val5#682, _3#679 AS val6#683]
// : +- *Filter isnotnull(_1#677)
// : +- LocalTableScan [_1#677, _2#678, _3#679]
// +- *Sort [key#711 ASC], false, 0
// +- ReusedExchange [key#711, val5#712, val6#713], Exchange hashpartitioning(key#681, 200)
또한
join
에 의해 생성 된 파티션 혜택을 누릴 수 있습니다 스파크 그래서 우리는 또 다른 join
수행하기를 원한다면
이것은 http://stackoverflow.com/questions/28850596/co-partitioned-joins-in-spark-sql?rq=1의 속일 수 있지만 토론이나 예제에 대한 링크도 참고하겠습니다. –
Do 당신은 http : //를 의미합니다. stackoverflow.com/q/30995699/1560062? – zero323