2014-11-06 6 views
0

Kafka와 Spark-Streaming 사이에 문제가 있습니다. 저급 트래픽 (초당 약 12000 - 15000 레코드/초) 서비스가 처음에는 생산량이 많았습니다. 그러나 10 - 15 분 후, 갑자기 소비 속도는 거의 1/10 남았습니다. 네트워크의 트래픽 문제 일 수 있습니까? 카프카Kafka 소비자의 누락 된 기록

구성 :
num.network.threads = 2
num.io.threads = 8
socket.send.buffer.bytes = 1048576
socket.receive.buffer.bytes = 1048576 = 104857600
log.flush.interval.messages = 10000
log.flush.interval.ms = 1000
log.retention.hours = 12
log.segm
socket.request.max.bytes ent.bytes = 536,870,912
log.retention.check.interval.ms = 60000
log.cleaner.enable = 거짓
log.cleanup.interval.mins = 스파크 스트리밍 1

구성 (소비자) :

.... 
val kafkaParams = Map(
    "zookeeper.connect" -> zkQuorum, 
    "group.id" -> group, 
    "zookeeper.connection.timeout.ms" -> "1000000", 
    "zookeeper.sync.time.ms" -> "200", 
    "fetch.message.max.bytes" -> "2097152000", 
    "queued.max.message.chunks" -> "1000", 
    "auto.commit.enable" -> "true", 
    "auto.commit.interval.ms" -> "1000") 

try { 
    KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics.map((_, partition)).toMap, 
     StorageLevel.MEMORY_ONLY).map { 
     case (key, value) => convertTo(key, value) 
    }.filter { 
     _ != null 
    }.foreachRDD(line => saveToHBase(line, INPUT_TABLE)) 
    //}.foreachRDD(line => logger.info("handling testing....."+ line)) 
    } catch { 
    case e: Exception => logger.error("consumerEx: " + e.printStackTrace) 
    } 

답변

관련 문제