2016-08-24 4 views
10

몇 가지 이상한 행동을 발견했을 때 나는 오래된 주제에 대해 몇 가지 테스트를하고있었습니다. 카프카의 로그를 읽고 나는이 메시지 "제거 (8)는 오프셋을 만료"주의 : 사실Apache Kafka 소비자 그룹의 오프셋은 어떻게 만료됩니까?

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator) 
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator) 
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log) 
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log) 
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log) 
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log) 
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log) 
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log) 
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator) 
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator) 
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator) 
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) 

을, 나는이 개 질문이 :

1)이 소비자 그룹에 대한 작동 만료를 상쇄하지 어떻게?

2)이 만료 된 오프셋은 내 소비자가 auto.offset.reset = latest 일 때 아무 것도 폴링하지 않지만 auto.offset.reset = 가장 빠르지 만 마지막으로 커미트 된 오프셋에서 폴링 한 경우이 동작을 설명 할 수 있습니까?

답변

19

Kafka는 기본적으로 구성 가능한 시간 후에 커밋 된 오프셋을 삭제합니다. 매개 변수 offsets.retention.minutes을 참조하십시오. 즉, 소비자 그룹이이 시간 동안 비활성화 된 경우 (즉, 오프셋을 커밋하지 않은 경우) 오프셋이 삭제됩니다. 당신이 소비자를 시작하면

, 다음과 같은 상황이 발생 : A (유효)에 대한

  1. 모습은이
  2. 에서 다시 시작, 유효가 발견 오프셋
    1. 경우 (소비자 그룹) 오프셋 커밋 된
    2. 발견 오프셋없이 유효한 경우 auto.offset.reset 파라미터
0123에 따른 오프셋을 리셋

따라서 오프셋이 삭제되고 auto.offset.reset = latest 인 경우 소비자는 새로운 데이터가 주제에 추가 될 때까지 아무 것도 폴링하지 않습니다. auto.offset.reset = earliest 인 경우 전체 주제를 사용해야합니다.

자세한 내용은이 JIRA를 참조하십시오. https://issues.apache.org/jira/browse/KAFKA-3806

+0

감사합니다. 따라서 활성 사용자의 경우에도이 보유 시간 동안 새로운 오프셋 커밋이 없으면 오프셋이 삭제됩니까? – Enzo

+0

그룹이 활성 상태이면 오프셋이 삭제되지 않아야합니다. –

+1

이것은 반드시 사실 일 필요는 없습니다. enable.auto.commit = false를 설정하고 새 데이터가 없으면 (커밋 없음) 커밋이 만료됩니다. – b2zw2a

관련 문제