0
RDBMS에서 사용자 지정 스트림 수신기를 만들려고합니다.스파크 스트리밍 빈 RDD 문제
val dataDStream = ssc.receiverStream(new inputReceiver())
dataDStream.foreachRDD((rdd:RDD[String],time:Time)=> {
val newdata=rdd.flatMap(x=>x.split(","))
newdata.foreach(println) // *******This line has problem, newdata has no records
})
ssc.start()
ssc.awaitTermination()
}
class inputReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("RDBMS data Receiver") {
override def run() {
receive()
}
}.start()
}
def onStop() {
}
def receive() {
val sqlcontext = SQLContextSingleton.getInstance()
// **** I am assuming something wrong in following code
val DF = sqlcontext.read.json("/home/cloudera/data/s.json")
for (data <- rdd) {
store(data.toString())
}
logInfo("Stopped receiving")
restart("Trying to connect again")
}
}
코드가 오류없이 실행 중이지만 데이터 프레임의 레코드를 인쇄하지 않습니다.
나는 다음과 같은 변경해야합니다 당신의 코드가 작동하려면 스파크 1.6 및 스칼라
'for (data <- rdd)'는 RDD를 사용하는 방법이 아닙니다. 또한, 어디에서 Dataframe을 인쇄하려고합니까? –
http://asyncified.io/2017/02/10/why-you-might-be-misusing-sparks-streaming-api/ –
여기 데이터 코드를 내 코드'dataDStream.foreachRDD (rdd : RDD [문자열], 시간 : 시간) => { newdata = rdd.flatMap (x => x.split (",")) newdata.foreach (println)' – Jhon