2016-09-16 2 views
3

일부 데이터 조작 쿼리에 대해 Spark SQL을 평가하려고합니다. 나는이이에 관심이 시나리오는 : 나는 표 1 및 표 2 RDDs을 만들 수있을 것 같은SparkSQL 및 파티션 분할 사용에 대한 이해

table1: key, value1, value2 
table2: key, value3, value4 

create table table3 as 
select * from table1 join table2 on table1.key = table2.key 

이 (하지만이 문서에 그 아주 명백한 예를 볼 수 없습니다) 소리가 난다. 더 큰 질문은 - 내가 2 테이블 RDD를 키로 성공적으로 파티셔닝 한 다음 Spark SQL과 함께 참여하게되면 파티셔닝을 충분히 활용할 수 있을까요? 그리고 그 조인의 결과로 새로운 RDD를 생성한다면, 또한 분할 될 것입니까? 즉, 완전히 뒤섞이지 않습니까? 나는이 주제에 대한 문서 및 예제에 대한 조언을 정말 감사하게 생각합니다.

+0

이것은 http://stackoverflow.com/questions/28850596/co-partitioned-joins-in-spark-sql?rq=1의 속일 수 있지만 토론이나 예제에 대한 링크도 참고하겠습니다. –

+0

Do 당신은 http : //를 의미합니다. stackoverflow.com/q/30995699/1560062? – zero323

답변

3

RDDsDatasets 사이의 전환을 의미하는 경우 두 질문에 대한 답변이 부정입니다.

RDD 파티셔닝은 RDD[(T, U)]에 대해서만 정의되며 RDDDataset으로 변환 된 후에 손실됩니다. 기존의 데이터 레이아웃에 도움이되는 경우도 있지만 RDDsDatasets은 각각 다른 해싱 기술 (표준 hashCodeMurmurHash)을 사용합니다. 물론 사용자 정의 분할 자 RDD을 정의하여 후자를 모방 할 수 있지만 정말 요점이 아닙니다.)

마찬가지로 DatasetRDD으로 변환하면 분할에 대한 정보가 손실됩니다.

Dataset 파티셔닝을 사용하면 joins을 최적화하는 데 사용할 수 있습니다. 테이블이 된 경우 예를 들어 사전 분할 다음 key에 따라 join

val n: Int = ??? 

val df1 = Seq(
    ("key1", "val1", "val2"), ("key2", "val3", "val4") 
).toDF("key", "val1", "val2").repartition(n, $"key").cache 

val df2 = Seq(
    ("key1", "val5", "val6"), ("key2", "val7", "val8") 
).toDF("key", "val3", "val4").repartition(n, $"key").cache 

이후 추가 교환을 필요로하지 않습니다.

df2.explain 
// == Physical Plan == 
// InMemoryTableScan [key#201, val3#202, val4#203] 
// +- InMemoryRelation [key#201, val3#202, val4#203], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
//   +- Exchange hashpartitioning(key#201, 3) 
//    +- LocalTableScan [key#201, val3#202, val4#203] 
// 
df1.join(df3, Seq("key")).explain 
// == Physical Plan == 
// *Project [key#171, val1#172, val2#173, val5#232, val6#233] 
// +- *SortMergeJoin [key#171], [key#231], Inner 
// :- *Sort [key#171 ASC], false, 0 
// : +- *Filter isnotnull(key#171) 
// :  +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)] 
// :   +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
// :     +- Exchange hashpartitioning(key#171, 3) 
// :     +- LocalTableScan [key#171, val1#172, val2#173] 
// +- *Sort [key#231 ASC], false, 0 
//  +- *Filter isnotnull(key#231) 
//   +- InMemoryTableScan [key#231, val5#232, val6#233], [isnotnull(key#231)] 
//    +- InMemoryRelation [key#231, val5#232, val6#233], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
//      +- Exchange hashpartitioning(key#231, 3) 
//       +- LocalTableScan [key#231, val5#232, val6#233] 

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1 

df1.explain 
// == Physical Plan == 
// InMemoryTableScan [key#171, val1#172, val2#173] 
// +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
//   +- Exchange hashpartitioning(key#171, 3) 
//    +- LocalTableScan [key#171, val1#172, val2#173] 

은 분명히 우리가 정말 하나의 결합에 그 혜택을하지 않습니다. 따라서 하나의 테이블이 복수 joins에 사용되는 경우에만 의미가 있습니다.

val df3 = Seq(
    ("key1", "val9", "val10"), ("key2", "val11", "val12") 
).toDF("key", "val5", "val6") 

df1.join(df3, Seq("key")).join(df3, Seq("key")) 
우리가 첫 번째 작업에 의해 생성 된 구조에서 도움이 될

(ReusedExchange주의) :

// == Physical Plan == 
// *Project [key#171, val1#172, val2#173, val5#682, val6#683, val5#712, val6#713] 
// +- *SortMergeJoin [key#171], [key#711], Inner 
// :- *Project [key#171, val1#172, val2#173, val5#682, val6#683] 
// : +- *SortMergeJoin [key#171], [key#681], Inner 
// :  :- *Sort [key#171 ASC], false, 0 
// :  : +- Exchange hashpartitioning(key#171, 200) 
// :  :  +- *Filter isnotnull(key#171) 
// :  :  +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)] 
// :  :    +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
// :  :     +- Exchange hashpartitioning(key#171, 3) 
// :  :      +- LocalTableScan [key#171, val1#172, val2#173] 
// :  +- *Sort [key#681 ASC], false, 0 
// :  +- Exchange hashpartitioning(key#681, 200) 
// :   +- *Project [_1#677 AS key#681, _2#678 AS val5#682, _3#679 AS val6#683] 
// :    +- *Filter isnotnull(_1#677) 
// :     +- LocalTableScan [_1#677, _2#678, _3#679] 
// +- *Sort [key#711 ASC], false, 0 
//  +- ReusedExchange [key#711, val5#712, val6#713], Exchange hashpartitioning(key#681, 200) 
또한

join에 의해 생성 된 파티션 혜택을 누릴 수 있습니다 스파크 그래서 우리는 또 다른 join 수행하기를 원한다면

+0

이것은 DataSet의 예입니다. 좋습니다. SparkSql에 매핑됩니까? 같은 열에서 분할 된 두 개의 DF를 결합하여 Spark SQL을 사용하여 새 DF를 만드는 경우 어떻게됩니까? 결과 DF가 분할됩니까? –

+0

예, SQL과'DataFrame' API는 실행 차이가 없습니다. – zero323

+0

과 나의 이해는 그것이 단지 불꽃 2.0에 똑똑 할 것이라는 점이다, 맞습니까? –