mqtt을 아파치 스트림으로 사용하려고합니다. 사용되는 lib가 spache-spool-sql-streaming-mqtt 인 lib를 사용합니다. 이 라이브러리는 paho mqtt 라이브러리를 사용합니다. 다음과 같이 내가 lib 디렉토리를 사용하고spark-sql-streaming-mqtt 불량 사용자 또는 암호
:
val spark = SparkSession
.builder
.appName("MQTTStreamWordCount")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("username", "user")
.option("password", "psw")
.option("brokerUrl", "tcp://ip:1883")
.option("topic", "/bikes")
.option("cleanSession", "true")
.load("tcp://ip:1883").as[(String, Timestamp)]
val query = lines.select("value").writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
을 그리고 난이 오류가 얻을 : "잘못된 사용자 이름이나 암호를".
다른 akka/scala 프로젝트에서 동일한 사용자/psw와 동일한 브로커에서 paho-mqtt lib를 사용하면 제대로 작동합니다.
는 그래서 난이 오류와 함께 혼란 스러워요