0
카프카 (Kafka) 주제를 통해 오는 데이터 스트림이 있습니다. Spark Streaming을 사용하여 읽었습니다.스파크 스트리밍 - Kafka- createStream - RDD - 데이터 프레임
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val kafkaStream = KafkaUtils.createStream(ssc, "xxxx.xx.xx.com:2181", "new-spark-streaming-group", Map("event_log" -> 10))
잘 작동합니다. 내가 원하는 것은 스트림 데이터에 열을 할당하여이 파일을 작성하는 것입니다. 따라서 내가 다음
kafkaStream.foreachRDD(rdd => {
if (rdd.count() == 0) {
println("No new SKU's received in this time interval " + Calendar.getInstance().getTime())
}
else {
println("No of SKUs received " + rdd.count())
rdd.map(record => {
record._2
}).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath)
그러나이 내가하시기 바랍니다 만드는 중이라서 실수 무엇 오류
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (1): _1
New column names (6): customer_id, sku, type, event, control_group, event_date
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:224)
at org.apache.spark.sql.DataFrameHolder.toDF(DataFrameHolder.scala:36)
at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:77)
at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:69)
를 제공 할. 우리가지도를 분할해야할까요? 우리가 그렇게한다면 우리는 그것을 (".. columns .."로) 변환하지 않을 것입니다.
당신의 도움에 감사드립니다.
감사
발라 들러