2016-10-14 2 views
1

KafkaUtils를 사용하여 Kafka에서 데이터를 수신하는 스파크 스트리밍 응용 프로그램을 작성하고 Kafka에서받은 데이터를 인쇄하고 싶습니다. 여기 내 코드입니다 (내가 사용하는 내 스파크 스트리밍 작업을 실행 스파크를 제출) : 이것을 실행하면 꽤 잘 작동스파크 스트리밍 콘솔에서 RDD 출력

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
messages.print() 

.

Time: 1476481700000 ms 

------------------------------------------- 
(null,a) 
(null,b) 
(null,c) 

을하지만 라인의 수를 계산 한 줄을 추가하는 경우, messages.print()이 작동하지 않을 수 있습니다 입력이 카프카 생산에 A, B, C 인 경우, 나는 불꽃이 아래 스트리밍에서 결과를 얻을 수 있습니다. 코드는 다음과 같습니다 : 나는 다음과 같은 결과를 얻고있다

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
messages.print() 
messages.count().print() 

:

------------------------------------------- 
Time: 1476481800000 ms 
------------------------------------------- 
4 

만 번호가 인쇄지고, 데이터를 출력 할 수 없습니다 계산합니다. messages.print()messages.count.print()에 추가 한 후 제 질문이 실행되지 않습니다.
또 다른 질문은 null이 튜플 (null, a)(null, b)(null, c)의 약자입니다.

답변

0

print()에 문제가 없으며 두 메시지를 모두 인쇄하고 다음과 같이 계산합니다. 스크롤하여 로그를 확인하십시오.

------------------------------------------- 
Time: 1476481700000 ms 
------------------------------------------- 
(null,a) 
(null,b) 
(null,c) 

------------------------------------------- 
Time: 1476481800000 ms 
------------------------------------------- 
4 

KafkaUtils.createDirectStream 방법은 <Kafka topic, Kafka message>의 d 스트림을 반환합니다. 항목과 관련된 thisthis 게시물이 null인지 확인하십시오.

+0

과 최종 목표를 달성 할 수있다. 그들은 밖으로 인쇄된다. 그러나 나는 그들을 전에 보지 않았다. 고마워 – Frankie

0

코드가 작동하지만 대안을 제공해야합니다. 그러나이 방법은 테스트 또는 학습용으로 만 사용됩니다. 대신 두 actions을 수행, 당신은 그래, 당신이 바로 단지 하나의 action

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
    //Cache your RDD before you perform any heavyweight operations. 
    messages.cache() 
    val result = messages.collect(); 
    println(result.size + " size") 
    result.foreach { input => println(input) } 
관련 문제