2016-08-02 2 views
0

HBase 테이블에 마스터 정보가있는 스트림 데이터가 있습니다. 모든 행에 대해 HBase 마스터 테이블을 조회하고 프로필 정보를 얻어야합니다. 내 코드는 다음과 같습니다.foreach 내부의 Spark Streaming 필터 조건 - NullPointerException

val con    = new setContext(hadoopHome,sparkMaster) 
val l_sparkcontext = con.getSparkContext 
val l_hivecontext = con.getHiveContext 

val topicname  = "events" 
val ssc    = new StreamingContext(l_sparkcontext, Seconds(30)) 
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10)) 
println("Kafka Stream for receiving Events..") 

val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile") 
profile_data.foreach(println) 
val tabBC = l_sparkcontext.broadcast(profile_data) 

eventsStream.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    val subs_profile_rows = tabBC.value 
    val Rows = record._2.split(rowDelim) 
    Rows.foreach(row => { 
     val values = row.split(colDelim) 
     val riid = values(1).toInt 
     val cond = "riid = " + riid 
     println("Condition : ", cond) 
     val enriched_events = subs_profile_rows.filter(cond) 
    }) // End of Rows 
    }) // End of RDD 
}) // End of Events Stream 

불행히도 필자는 항상 필터에서 NPE를 누르십시오. 근로자 노드에서 값을 방송하기 위해 여기 몇 가지 질문과 답변을 따랐지만 아무것도 도움이되지 못했습니다. 누군가 제발 도와 줄 수 있어요.

관련

발라

+0

직렬화 할 수없는 값을 사용하고 있는지 확인하십시오. – cchantep

+0

profile_data가 foreach 내부에서 작성되어야하며 직렬화 할 수없는 것이 확실하지 않습니다. –

답변

0

귀하의 상황에 맞는 사용은 당신이 시도하는 두 개의 별도의 컨텍스트 (하나의 불꽃, 스파크 스트리밍을) 만들고있는 것처럼 약간 비린내가 ... 나에게 보이는 모습 이러한 컨텍스트간에 브로드 캐스트 변수를 공유합니다 (작동하지 않음).

우리는 비슷한 코드를 작성했습니다. 관심있는 경우에 대비하여 Splice Machine (오픈 소스)에서 어떻게했는지 보여주는 비디오가 있습니다. 나는 그 코드를 찾거나 다른 사람이 그 코드를 게시하도록 노력할 것이다.

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/

행운

.

+0

감사합니다. 나는 비디오를 살펴볼 것이다. 요구 사항은 HBase 테이블에서 DStream에서 가져온 데이터에 대한 프로파일 정보를 읽는 것입니다. 나는 forEachPartition을 얻었지만 (다음 주석으로 변경된 코드를 게시 할 것이다), 그러나 그것은 다른 에러를 준다. 나는 당신이 그것을 얻을 수 있다면 코드를 기다릴 것이다. 도와 주셔서 대단히 감사합니다. –

+0

공간 제한으로 인해 코드를 두 개의 게시물로 나눠야합니다. - 시작 클래스 setContext (argHadoopHome : String, argSparkMaster : String) { System.setProperty ("hadoop.home.dir", argHadoopHome) val conf = new sparkConf(). setMaster (argSparkMaster); conf.setAppName ("Evts"); 어레이 (conf의) 전용 브로 l_hiveContext = 새로운 HiveContext (l_valSparkContext) DEF getSparkContext = l_valSparkContext DEF getHiveContext = l_hiveContext DEF getconfContext = conf의 } –

+0

오브젝트 receiveEvents { DEF 주 (인수 전용 브로 l_valSparkContext = 새로운 SparkContext [문자열]) : 단위 = { var rD = "\ r \ n" var cD = "," var sM = "스파크 : // nm2 : 7077" var ip = "nm2 : 2181" var hadoopHome = "/ home/.." val con = 새 setContext (ip, sM) val l_sparkcontext = con.getSparkContext 발 topicname = "EVT" 발에 SSC = 새로운 StreamingContext (l_sparkcontext, 초 (9)) 발 eventsStream \t = KafkaUtils.createStream (SSC, "NM2 : 2181", "RCV"지도 (topicname.toString -> 2)) val profile_data = w_hivecontext.sql ("hb_cust_pro에서 성별, 수입, 연령 선택") –

관련 문제