2017-12-01 3 views
2

Spark 스트리밍 데이터 세트 (구조화 된 스트리밍)를 피벗하려하지만 AnalysisException (아래 발췌)이 표시됩니다.스트리밍 데이터 세트를 피벗하는 방법은 무엇입니까?

구조화 된 스트림 (2.0)에서 피벗 팅이 실제로 지원되지 않는다는 것을 확인해 주시겠습니까? 아마도 대체 방법을 제안 할 수 있습니까?

스레드 "main"의 예외 org.apache.spark.sql.AnalysisException : 스트리밍 소스가있는 쿼리는 writeStream.start() ;;와 함께 실행되어야합니다. kafka 에서 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ 분석 $ UnsupportedOperationChecker $$ throwError (UnsupportedOperationChecker.scala : 297) at org.apache.spark. sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala : 36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala : 34) org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala에서 127)

답변

0

TL; DRpivot 응집은 2.2.0까지의 Spark Structured Streaming에서는 지원되지 않으며 2.3.0-SNAPSHOT에는 아직 지원되지 않는 것 같습니다.

저는 오늘 마스터에서 빌드 된 Spark 2.3.0-SNAPSHOT을 사용합니다. (의 논리 계획) 스트리밍 쿼리는 작업을 지원하는 사용할지 여부를

scala> spark.version 
res0: String = 2.3.0-SNAPSHOT 

UnsupportedOperationChecker

(당신이 스택 추적에서 찾을 수 있음)를 확인합니다.

pivot을 실행할 때 pivot을 사용할 수있는 유일한 인터페이스 인 경우 groupBy이 먼저 있어야합니다.

pivot으로 두 가지 문제가 있습니다

  1. pivot은 값을 생성하기 위해 얼마나 많은 열을 알고 싶어 따라서 스트리밍 데이터 집합 불가능 collect 않습니다.

  2. pivot 스트리밍 구조화 그 불꽃이

이의 정의에 피벗없이 열이 문제 하나를 살펴 보자 지원하지 않습니다 ( groupBy 옆에) 실제로 다른 집합이다.

val sq = spark 
    .readStream 
    .format("rate") 
    .load 
    .groupBy("value") 
    .pivot("timestamp") // <-- pivot with no values 
    .count 
    .writeStream 
    .format("console") 
scala> sq.start 
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
rate 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35) 
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:64) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:75) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:73) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:79) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90) 
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3189) 
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2665) 
    at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:327) 
    ... 49 elided 

마지막 두 라인 문제, 즉 pivotdoes 내부적 collect 때문에 문제를 나타낸다.

다른 문제는 열이 다음 인해 multiple aggregations에 다른 문제를 얻을 거라고에 피벗을 위해 당신이 값을 지정 것 (당신이 일어난대로 streaming하지 batch에 대한 검사가 실제로 있다고 볼 수는 없지만이다 첫 번째 경우와 함께).

val sq = spark 
    .readStream 
    .format("rate") 
    .load 
    .groupBy("value") 
    .pivot("timestamp", Seq(1)) // <-- pivot with explicit values 
    .count 
    .writeStream 
    .format("console") 
scala> sq.start 
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;; 
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L] 
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141] 
    +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L] 
     +- StreamingRelation DataSource([email protected],rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L] 

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:92) 
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) 
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:278) 
    ... 49 elided 
관련 문제