2016-09-09 6 views
1

안녕하세요, 저는 Apache Spark Streaming을 사용하여 Twitter에서 트윗을 읽고 DataFrame으로 변환하려고합니다. 제가 아래에 붙여 넣은 접근법이 있습니다. 그러나 나는 올바른 접근법을 얻을 수 없다. 어떤 조언을 환영합니다.DStream을 데이터 프레임으로 변환

foreach 내에서 DF로 변환하면 tweetStream에서 단일 DF를 얻을 수 없다는 것을 알 수 있습니다. 나는 아마도 새로운 접근법을 가지고있을 것이다. 어떻게 접근합니까?

val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth).filter(status=>status.getLang=="en") 
     .map(status=>gson.toJson(status)) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 
    tweetStream.foreachRDD({status=>val DF = status.toDF()}) 
+0

내가 foreachRDD 내부에서 계산 된 전체 DF를 얻기 위해 루프 내부 DF.merge()를 사용하여 생각하고 있었는데 {} – Ayon

답변

0
나는 그것을 시도하지 않은,하지만 어쩌면이 같은 작품

:

var df_tweets:DataFrame = null 

    dstream_tweets.foreachRDD { 
    rrd => if (df_tweets != null) { 
     df_tweets = df_tweets.unionAll(rdd.toDF) // combine previous dataframe 
    } else { 
     df_tweets = rdd.toDF() // create new dataframe 
     } 
    } 
관련 문제