TL; DRpivot
응집은 2.2.0까지의 Spark Structured Streaming에서는 지원되지 않으며 2.3.0-SNAPSHOT에는 아직 지원되지 않는 것 같습니다.
저는 오늘 마스터에서 빌드 된 Spark 2.3.0-SNAPSHOT을 사용합니다. (의 논리 계획) 스트리밍 쿼리는 작업을 지원하는 사용할지 여부를
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
UnsupportedOperationChecker
(당신이 스택 추적에서 찾을 수 있음)를 확인합니다.
pivot
을 실행할 때 pivot
을 사용할 수있는 유일한 인터페이스 인 경우 groupBy
이 먼저 있어야합니다.
는 pivot
으로 두 가지 문제가 있습니다
pivot
은 값을 생성하기 위해 얼마나 많은 열을 알고 싶어 따라서 스트리밍 데이터 집합 불가능 collect
않습니다.
pivot
스트리밍 구조화 그 불꽃이
이의 정의에 피벗없이 열이 문제 하나를 살펴 보자 지원하지 않습니다 (
groupBy
옆에) 실제로 다른 집합이다.
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp") // <-- pivot with no values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3189)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2665)
at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:327)
... 49 elided
마지막 두 라인 문제, 즉 pivot
does 내부적 collect
때문에 문제를 나타낸다.
다른 문제는 열이 다음 인해 multiple aggregations에 다른 문제를 얻을 거라고에 피벗을 위해 당신이 값을 지정 것 (당신이 일어난대로 streaming하지 batch에 대한 검사가 실제로 있다고 볼 수는 없지만이다 첫 번째 경우와 함께).
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp", Seq(1)) // <-- pivot with explicit values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
+- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
+- StreamingRelation DataSource([email protected],rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:92)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:278)
... 49 elided