2017-11-19 4 views
0

제작자가 보낸 데이터가 소비자에게 전달되지 않는 이유를 알 수 없습니다. cloudera 가상 시스템을 연구 중입니다. 저는 생산자가 Kafka를 사용하고 소비자가 스파크 스트리밍을 사용하는 간단한 제작자 소비자를 작성하려고합니다.Kafka 및 Spark Streaming Simple Producer Consumer

스칼라의 생산자 코드 : 스칼라에서

import java.util.Properties 
import org.apache.kafka.clients.producer._ 

object kafkaProducer { 

    def main(args: Array[String]) { 
    val props = new Properties() 
    props.put("bootstrap.servers", "localhost:9092") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 

    val producer = new KafkaProducer[String, String](props) 

    val TOPIC = "test" 

    for (i <- 1 to 50) { 
     Thread.sleep(1000) //every 1 second 
     val record = new ProducerRecord(TOPIC, generator.getID().toString(),generator.getRandomValue().toString()) 
     producer.send(record) 
    } 

    producer.close() 
    } 
} 

소비자 코드 :

 val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1)) 

: 문제는 소비자의 코드 라인을 변경하여 해결

import java.util 

import org.apache.kafka.clients.consumer.KafkaConsumer 

import scala.collection.JavaConverters._ 
import java.util.Properties 

import kafka.producer._ 

import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.kafka._ 

object kafkaConsumer { 
     def main(args: Array[String]) { 


     var totalCount = 0L 
     val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AnyName").set("spark.driver.host", "localhost") 
     val ssc = new StreamingContext(sparkConf, Seconds(2)) 
     ssc.checkpoint("checkpoint") 
     val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1)) 

     stream.foreachRDD((rdd: RDD[_], time: Time) => { 
      val count = rdd.count() 
      println("\n-------------------") 
      println("Time: " + time) 
      println("-------------------") 
      println("Received " + count + " events\n") 
      totalCount += count 
     }) 
     ssc.start() 
     Thread.sleep(20 * 1000) 
     ssc.stop() 

     if (totalCount > 0) { 
      println("PASSED") 
     } else { 
      println("FAILED") 
     } 
     } 
} 
+0

나는 생산자와 소비자를 순차적으로 시작한다고 생각하십니까 ??? – nabongs

+0

예, 저는 소비자가 실행되는 동안 소비자와 생산자를 시작합니다. – MennatAllahHany

+0

콘솔 생산자가 생산자 코드를 테스트했고 콘솔 생산자가 소비자 코드를 테스트 했습니까? Kafka - Spark 통합이 까다로울 수 있습니다 ... –

답변

0

두 번째 매개 변수는 9092가 아닌 2181 인 사육사 포트 여야하며 사육사가 연결할 수 있어야합니다. 카프카 항구 9092.

참고 : Kafka는 생산자와 소비자를 모두 실행하기 전에 터미널에서 시작해야합니다.