카프카 메시지를 소비하기 위해 스파크 스트리밍을 사용하고 있습니다. 나는 모든 메시지를 읽지 않고 kafka에서 샘플로 일부 메시지를 얻고 싶습니다. 따라서 일련의 메시지를 읽고 발신자에게 보내고 스파크 스트리밍을 중지하고 싶습니다. 현재 spark 스트리밍 컨텍스트 메서드의 awaitTermination 메서드에서 batchInterval 시간을 전달하고 있습니다. 지금은 스파크 스트리밍에서 호출자에게 처리 된 데이터를 반환하는 방법을 사용하지 않습니다. 저는 여기에 "샘플"라는 문자열 빌더에 메시지를 저장하는 그래서 그 대신 현재첫 번째 데이터 배치를 읽은 후 스파크 스트리밍을 중지합니다.
def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
if (params.contains("zookeeperQourum"))
zkQuorum = params.get("zookeeperQourum").get
if (params.contains("userGroup"))
group = params.get("userGroup").get
if (params.contains("topics"))
topics = params.get("topics").get
if (params.contains("numberOfThreads"))
numThreads = params.get("numberOfThreads").get
if (params.contains("sink"))
sink = params.get("sink").get
if (params.contains("batchInterval"))
interval = params.get("batchInterval").get.toInt
val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
val ssc = new StreamingContext(sparkConf, Seconds(interval))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
var consumerConfig = scala.collection.immutable.Map.empty[String, String]
consumerConfig += ("auto.offset.reset" -> "smallest")
consumerConfig += ("zookeeper.connect" -> zkQuorum)
consumerConfig += ("group.id" -> group)
var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x))
streams.foreach(rdd => rdd.foreachPartition(itr => {
while (itr.hasNext && size >= 0) {
var msg=itr.next
println(msg)
sample.append(msg)
sample.append("\n")
size -= 1
}
}))
ssc.start()
ssc.awaitTermination(5000)
ssc.stop(true)
}
을 사용하고 내 코드 나는 호출자에게 반환 할.
.1, 당신의 솔루션을 사용할 때 다음 예외가 발생합니다 :'org.apache.spark.SparkException : AsynchronousListenerBus의 리스너 쓰레드 내에서 StreamingContext를 멈출 수 없습니다. ' 어떤 아이디어가 이것을 고치는 방법? – pederpansen