2016-10-11 3 views
1

mqtt을 아파치 스트림으로 사용하려고합니다. 사용되는 lib가 spache-spool-sql-streaming-mqtt 인 lib를 사용합니다. 이 라이브러리는 paho mqtt 라이브러리를 사용합니다. 다음과 같이 내가 lib 디렉토리를 사용하고spark-sql-streaming-mqtt 불량 사용자 또는 암호

:

val spark = SparkSession 
    .builder 
    .appName("MQTTStreamWordCount") 
    .master("local[4]") 
    .getOrCreate() 

import spark.implicits._ 
// Create DataFrame representing the stream of input lines from connection to mqtt server 
val lines = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("clientId", "sparkTest") 
    .option("username", "user") 
    .option("password", "psw") 
    .option("brokerUrl", "tcp://ip:1883") 
    .option("topic", "/bikes") 
    .option("cleanSession", "true") 
    .load("tcp://ip:1883").as[(String, Timestamp)] 

    val query = lines.select("value").writeStream 
    .outputMode("append") 
    .format("console") 
    .start() 



query.awaitTermination() 

을 그리고 난이 오류가 얻을 : "잘못된 사용자 이름이나 암호를".

다른 akka/scala 프로젝트에서 동일한 사용자/psw와 동일한 브로커에서 paho-mqtt lib를 사용하면 제대로 작동합니다.

는 그래서 난이 오류와 함께 혼란 스러워요

답변

0

솔루션 :

"org.eclipse.paho :

  1. 사용 PAHO - MQTT LIB 버전 1.1.0 autoReconnect 방법을 가지고 "%"org.eclipse.paho.client.mqttv3 "%"1.1.0 "

  2. 인증이 e에 없기 때문에 github 소스에서 자신의 bahir spark-sql-streaming-mqtt를 빌드하십시오 xisting 릴리스. https://github.com/apache/bahir

관련 문제