2017-11-15 4 views
0

카프카 1.0을 대기열로 사용하는 응용 프로그램이 있습니다. Kafka 주제에는 80 개의 파티션과 80 개의 소비자가 실행 중입니다. (카프카 - 파이썬 소비자). 명령을 실행하여카프카 파티션 지연 증가

: 제가

파티션 중 하나가 오프셋에서 중단하고, 새로운 레코드가 추가 될 때 지연을 연속적으로 증가되는 것을 볼
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe 

.

TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG  CONSUMER-ID          HOST 

118 mytopic      37   1924   2782   858  kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost 
119 mytopic      38   2741   2742   1   kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost 
120 mytopic      39   2713   2713   0   kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost 
121 mytopic      40   2687   2688   1   kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost 

무엇이 발생합니다

위 명령의 출력은 다음과 같이 보입니다? 또한이 서버는 정기적으로 수동으로 모니터링하지 않을 수 있으므로 reset-offsets 명령을 사용하여 오프셋을 재설정하는 것은 바람직하지 않습니다.

클라이언트는 리눅스 m의/C가 병렬 프로세스로 백그라운드에서 실행 :

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092', 
        session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1, 
        auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024, 
        value_deserializer=lambda m: json.loads(m.decode('ascii'))) 

for message in consumer: 
    msg = json.loads(message.value) 
    process_message(msg) 

답변

0

오프셋 소비자가 잠시 후 이동하지 않은 경우, 소비자는 중지 가능성은 입니다. 소비자 오프셋이 이동 중이지만 소비자 지연 시간이 (로그 끝과 소비자 오프셋의 차이) 인 경우 이 증가하면 소비자가 생산자보다 느려집니다. 소비자 이 느린 경우 일반적인 해결 방법은 소비자의 병렬 처리 수준을 높이는 것입니다. 주제의 파티션 수를 늘려야 할 수도 있습니다.

자세한 내용은 Kafka docs을 참조하십시오.

간단히 말하면; 당신은 당신이 소비하는 것보다 더 많이 생산하고 있습니다. 지연을 줄이려면 소비량을 늘려야합니다. 더 많은 소비자를 추가해야합니다. 테스트 만하는 경우 소비자가 느립니다.

+0

소비자는 백그라운드에서 실행중인 kafka-python 클라이언트입니다. 갑자기 멈출 수있는 이유. 클라이언트 인스턴스의 수를 확인했는데 문제가없는 것 같습니다. 소비자를 다시 시작해도 문제가 해결되지 않는 것 같습니다. – ashdnik

+0

질문 자체에 클라이언트 코드가 추가되었습니다. – ashdnik