2014-12-02 4 views
2

카프카 메시지를 소비하기 위해 스파크 스트리밍을 사용하고 있습니다. 나는 모든 메시지를 읽지 않고 kafka에서 샘플로 일부 메시지를 얻고 싶습니다. 따라서 일련의 메시지를 읽고 발신자에게 보내고 스파크 스트리밍을 중지하고 싶습니다. 현재 spark 스트리밍 컨텍스트 메서드의 awaitTermination 메서드에서 batchInterval 시간을 전달하고 있습니다. 지금은 스파크 스트리밍에서 호출자에게 처리 된 데이터를 반환하는 방법을 사용하지 않습니다. 저는 여기에 "샘플"라는 문자열 빌더에 메시지를 저장하는 그래서 그 대신 현재첫 번째 데이터 배치를 읽은 후 스파크 스트리밍을 중지합니다.

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = { 
    if (params.contains("zookeeperQourum")) 
     zkQuorum = params.get("zookeeperQourum").get 
    if (params.contains("userGroup")) 
     group = params.get("userGroup").get 
    if (params.contains("topics")) 
     topics = params.get("topics").get 
    if (params.contains("numberOfThreads")) 
     numThreads = params.get("numberOfThreads").get 
    if (params.contains("sink")) 
     sink = params.get("sink").get 
    if (params.contains("batchInterval")) 
     interval = params.get("batchInterval").get.toInt 
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077") 
    val ssc = new StreamingContext(sparkConf, Seconds(interval)) 
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    var consumerConfig = scala.collection.immutable.Map.empty[String, String] 
    consumerConfig += ("auto.offset.reset" -> "smallest") 
    consumerConfig += ("zookeeper.connect" -> zkQuorum) 
    consumerConfig += ("group.id" -> group) 
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) 
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x)) 
    streams.foreach(rdd => rdd.foreachPartition(itr => { 
     while (itr.hasNext && size >= 0) { 
     var msg=itr.next 
     println(msg) 
     sample.append(msg) 
     sample.append("\n") 
     size -= 1 
     } 
    })) 
    ssc.start() 
    ssc.awaitTermination(5000) 
    ssc.stop(true) 
    } 

을 사용하고 내 코드 나는 호출자에게 반환 할.

답변

3

당신은, 그 안에 다음 StreamingListener을 구현할 수 있습니다 당신이 당신이 JobListener에 SparkStreaming를 연결하는 방법입니다 ssc.stop()을

private class MyJobListener(ssc: StreamingContext) extends StreamingListener { 

    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { 

    ssc.stop(true) 

    } 

} 

호출 할 수 있습니다 onBatchCompleted : 스파크 1.6

val listen = new MyJobListener(ssc) 
ssc.addStreamingListener(listen) 

ssc.start() 
ssc.awaitTermination() 
+1

.1, 당신의 솔루션을 사용할 때 다음 예외가 발생합니다 :'org.apache.spark.SparkException : AsynchronousListenerBus의 리스너 쓰레드 내에서 StreamingContext를 멈출 수 없습니다. ' 어떤 아이디어가 이것을 고치는 방법? – pederpansen

0

우리는 코드

var sampleMessages=streams.repartition(1).mapPartitions(x=>x.take(10)) 

의 다음 조각을 사용하여 샘플 메시지를 얻을 수 있으며, 우리가 처음 배치 한 후 중지하려면 우리는 우리 자신의 StreamingListener 인터페이스를 구현해야하며 onBatchCompleted 방법에 스트리밍을 중지해야한다.

관련 문제