HBase 테이블에 마스터 정보가있는 스트림 데이터가 있습니다. 모든 행에 대해 HBase 마스터 테이블을 조회하고 프로필 정보를 얻어야합니다. 내 코드는 다음과 같습니다.foreach 내부의 Spark Streaming 필터 조건 - NullPointerException
val con = new setContext(hadoopHome,sparkMaster)
val l_sparkcontext = con.getSparkContext
val l_hivecontext = con.getHiveContext
val topicname = "events"
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10))
println("Kafka Stream for receiving Events..")
val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile")
profile_data.foreach(println)
val tabBC = l_sparkcontext.broadcast(profile_data)
eventsStream.foreachRDD(rdd => {
rdd.foreach(record => {
val subs_profile_rows = tabBC.value
val Rows = record._2.split(rowDelim)
Rows.foreach(row => {
val values = row.split(colDelim)
val riid = values(1).toInt
val cond = "riid = " + riid
println("Condition : ", cond)
val enriched_events = subs_profile_rows.filter(cond)
}) // End of Rows
}) // End of RDD
}) // End of Events Stream
불행히도 필자는 항상 필터에서 NPE를 누르십시오. 근로자 노드에서 값을 방송하기 위해 여기 몇 가지 질문과 답변을 따랐지만 아무것도 도움이되지 못했습니다. 누군가 제발 도와 줄 수 있어요.
관련
발라
는
직렬화 할 수없는 값을 사용하고 있는지 확인하십시오. – cchantep
profile_data가 foreach 내부에서 작성되어야하며 직렬화 할 수없는 것이 확실하지 않습니다. –