2017-02-14 3 views
0

RDBMS에서 사용자 지정 스트림 수신기를 만들려고합니다.스파크 스트리밍 빈 RDD 문제

val dataDStream = ssc.receiverStream(new inputReceiver()) 
    dataDStream.foreachRDD((rdd:RDD[String],time:Time)=> { 
    val newdata=rdd.flatMap(x=>x.split(",")) 
    newdata.foreach(println) // *******This line has problem, newdata has no records 
    }) 

ssc.start() 
ssc.awaitTermination() 
} 

class inputReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { 
    def onStart() { 
    // Start the thread that receives data over a connection 
    new Thread("RDBMS data Receiver") { 
     override def run() { 
     receive() 
     } 
    }.start() 
    } 
    def onStop() { 
    } 

    def receive() { 
    val sqlcontext = SQLContextSingleton.getInstance() 

    // **** I am assuming something wrong in following code 
    val DF = sqlcontext.read.json("/home/cloudera/data/s.json") 
    for (data <- rdd) { 
     store(data.toString()) 
    } 
    logInfo("Stopped receiving") 
    restart("Trying to connect again") 
    } 
} 

코드가 오류없이 실행 중이지만 데이터 프레임의 레코드를 인쇄하지 않습니다.

나는 다음과 같은 변경해야합니다 당신의 코드가 작동하려면 스파크 1.6 및 스칼라

+0

'for (data <- rdd)'는 RDD를 사용하는 방법이 아닙니다. 또한, 어디에서 Dataframe을 인쇄하려고합니까? –

+1

http://asyncified.io/2017/02/10/why-you-might-be-misusing-sparks-streaming-api/ –

+0

여기 데이터 코드를 내 코드'dataDStream.foreachRDD (rdd : RDD [문자열], 시간 : 시간) => { newdata = rdd.flatMap (x => x.split (",")) newdata.foreach (println)' – Jhon

답변

0

을 사용하고 있습니다 :

def receive() { 
    val sqlcontext = SQLContextSingleton.getInstance() 
    val DF = sqlcontext.read.json("/home/cloudera/data/s.json") 

    // **** this: 
    rdd.collect.foreach(data => store(data.toString())) 

    logInfo("Stopped receiving") 
    restart("Trying to connect again") 
} 

이 바람직하지 않습니다 그러나이하여 JSON 파일의 모든 데이터 때문에 것 운전자가 처리 할 수 ​​있으며 수신기가 신뢰성을 제대로 고려하지 않았습니다.

나는 Spark Streaming이 귀하의 사용 케이스에 적합하지 않다고 생각합니다. 줄 사이를 읽으면 스트리밍 중이고 적절한 제작자가 필요하거나 RDBMS에서 json으로 덤프 된 데이터를 읽는 중입니다.이 경우에는 Spark Streaming이 필요하지 않습니다.