2017-11-10 1 views
2

제공되는 샘플 wordcount 예제를 수정하여 Apache Bahir를 사용하여 spark 구조화 된 스트리밍 MQTT를 실행하려고합니다."java.lang.NoSuchMethodError : org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect (Z) V"의 원인

SPARK 버전 : spark-2.2.0-bin-hadoop2.7.

내가 프로그램을 실행하려면이 명령을 사용하고 있습니다 : 아래 bin\spark-submit --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.2.0 mqtt.py

은 내 코드입니다 :

# mqtt.py 

from __future__ import print_function 
import sys 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode 
from pyspark.sql.functions import split 
if __name__ == "__main__": 
    spark = SparkSession\ 
     .builder\ 
     .appName("StructuredNetworkWordCount")\ 
     .getOrCreate() 
    broker_uri = 'xyz.com:1883' 
    lines = (spark\ 
     .readStream\ 
     .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\ 
     .option("topic","xyz")\ 
     .load("tcp://{}".format(broker_uri)))\ 

    # Split the lines into words 
    words = lines.select(
     # explode turns each item in an array into a separate row 
     explode(
      split(lines.value, ' ') 
     ).alias('word') 
    ) 

    # Generate running word count 
    wordCounts = words.groupBy('word').count() 

    # Start running the query that prints the running counts to the console 
    query = wordCounts\ 
     .writeStream\ 
     .outputMode('complete')\ 
     .format('console')\ 
     .start() 

    query.awaitTermination() 

그러나 나는 점점 쿼리에 다음과 같은 오류 얻을 :

17/11/09 19:48:14 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 
17/11/09 19:48:16 INFO StreamExecution: Starting [id = e0335f31-f3e0-4ee1-a774-52582268845c, runId = f6a87268-164c-4eab-82db-1ac0bacd2bad]. Use C:\Users\xyz\AppData\Local\Temp\temporary-42cbc22f-7c1d-413c-b81c-3d4496f8e297 to store the query checkpoint. 
17/11/09 19:48:16 WARN MQTTStreamSourceProvider: If `clientId` is not set, a random value is picked up. 
Recovering from failure is not supported in such a case. 
17/11/09 19:48:16 ERROR StreamExecution: Query [id = e0335f31-f3e0-4ee1-a774-52582268845c, runId = f6a87268-164c-4eab-82db-1ac0bacd2bad] terminated with error 
java.lang.NoSuchMethodError: org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(Z)V 
     at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider.createSource(MQTTStreamSource.scala:219) 
     at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155) 
     at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194) 
     at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) 
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206) 
Exception in thread "stream execution thread for [id = e0335f31-f3e0-4ee1-a774-52582268845c, runId = f6a87268-164c-4eab-82db-1ac0bacd2bad]" java.lang.NoSuchMethodError: org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(Z)V 
     at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider.createSource(MQTTStreamSource.scala:219) 
     at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)Traceback (most recent call last): 
    File "C:/Users/xyz/Documents/Fall-17/Transportation/spark/mqtt.py", line 84, in <module> 

    query.awaitTermination() 
    File "C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\streaming.py", line 106, in awaitTermination 
     at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194) File "C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__ 
    File "C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 75, in deco 
pyspark.sql.utils.StreamingQueryException: u'org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(Z)V\n=== Streaming Query ===\nIdentifier: [id = e0335f31-f3e0-4ee1-a774-52582268845c, runId = f6a87268-164c-4eab-82db-1ac0bacd2bad]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread State: RUNNABLE' 

     at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) 
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276) 
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206) 
17/11/09 19:48:16 INFO SparkContext: Invoking stop() from shutdown hook 

수있는 사람을 내가 잘못 가고있는 곳에서 도와 줘?

답변

0

나는 똑같은 문제가있어서 그것을 해결했다. 귀하의 상태가 내 것과 다를 수 있습니다. 나는 대답 대신에 단지 논평해야한다는 것을 안다. 그러나 나의 평판은 논평하기에는 너무 낮습니다.

필자의 maven 프로젝트에는 org.apache.bahir : spark-streaming-mqtt_2.11 : 2.2.0 및 org.apache.bahir : spark-sql-streaming-mqtt_2.11 : 2.2의 종속성이 포함되어 있습니다. 1.0.2

스파크-SQL-스트리밍 mqtt_2.11이 org.eclipse.paho.client에 따라 달라집니다 : 0.0

스파크 스트리밍 mqtt_2.11는 org.eclipse.paho.client.mqttv3에 따라 달라집니다 .mqttv3 : 1.1.0

1.1.0에는 MqttConnectOptions.setAutomaticReconnect가 있지만 1.0.2에는 없습니다.

spark-streaming-mqtt_2.11 종속성을 제거한 후 작동합니다.

관련 문제