2017-04-19 5 views
0

카프카 (Kafka) 주제를 통해 오는 데이터 스트림이 있습니다. Spark Streaming을 사용하여 읽었습니다.스파크 스트리밍 - Kafka- createStream - RDD - 데이터 프레임

val ssc = new StreamingContext(l_sparkcontext, Seconds(30)) 
    val kafkaStream = KafkaUtils.createStream(ssc, "xxxx.xx.xx.com:2181", "new-spark-streaming-group", Map("event_log" -> 10)) 

잘 작동합니다. 내가 원하는 것은 스트림 데이터에 열을 할당하여이 파일을 작성하는 것입니다. 따라서 내가 다음

kafkaStream.foreachRDD(rdd => { 
    if (rdd.count() == 0) { 
    println("No new SKU's received in this time interval " + Calendar.getInstance().getTime()) 
    } 
    else { 
    println("No of SKUs received " + rdd.count()) 
    rdd.map(record => { 
     record._2 
    }).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath) 

그러나이 내가하시기 바랍니다 만드는 중이라서 실수 무엇 오류

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. 
Old column names (1): _1 
New column names (6): customer_id, sku, type, event, control_group, event_date 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:224) 
    at org.apache.spark.sql.DataFrameHolder.toDF(DataFrameHolder.scala:36) 
    at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:77) 
    at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:69) 

를 제공 할. 우리가지도를 분할해야할까요? 우리가 그렇게한다면 우리는 그것을 (".. columns .."로) 변환하지 않을 것입니다.

당신의 도움에 감사드립니다.

감사

발라 들러

답변

0

감사합니다. 나는 이것을 분류했다. 그것은 코딩 문제였습니다. 미래에이 작업을 수행 할 사람들을 위해 다시

발라

kafkaStream.foreachRDD(rdd => { 
    if (rdd.count() == 0) { 
    println("No new SKU's received in this time interval " + Calendar.getInstance().getTime()) 
    } 
    else { 
    println("No of SKUs received " + rdd.count()) 
    rdd.map(record => (record._2).split(",")) 
    }.map(r => (r(0).replace(Quote,"").toInt,r(1).replace(Quote,"").toInt,r(2),r(3),r(4),r(5))).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath) 
    }) 

감사합니다 아래로 다른 부분을 변경하십시오

관련 문제