2017-02-28 1 views
1

저는 우리가 가지고있는 데이터 재생을 구현하고 싶었습니다. 그 때문에 Kafka 보존 정책을 사용해야합니다. (조인을 사용하고 있으므로 정확한 시간이 필요합니다.). 오후 8시 30 분 P.S. 나는이 같은 주제에 내 데이터를 전송 카프카 버전 0.10.1.1Kafka 보유 정책이 예상대로 작동하지 않습니다.

을 사용하고 있습니다 :

kafkaProducer.send(
        new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r) 
      ); 

그리고 나는이 같은 내 주제를 만듭니다

카프카 - 주제 --create - -zookeeper localhost : 2181 --replication-factor 1 --partitions 1 --topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics - zookeeper localhost - alter --topic myTopic --config segment.ms = 172800000

위의 설정에 따라 주제의 보유 시간을 48 시간으로 설정해야합니다.

나는 각 메시지의 실제 시간을 기록하기 위해 TimestampExtractor까지 확장합니다.

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor { 
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class); 
    @Override 
    public long extract(ConsumerRecord<Object, Object> consumerRecord) { 
     LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp())); 
     return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis(); 
    } 
} 

테스트를 위해 내 주제에 4 개의 메시지를 보냈고이 4 개의 로그 메시지를 받았습니다.

2017년 2월 28일 10시 23분 39초 INFO ConsumerRecordOrWallclockTimestampExtractor : 21 - 소인 1,488,295,086,292 인간 readble -Tue 이월 28 동부 표준시 10시 18분 6초 2,017
2017년 2월 28일 10시 24분 01 INFO ConsumerRecordOrWallclockTimestampExtractor : 21 - 소인 1,483,272,000,000 인간 readble - 일 일월 01 동부 표준시 07시 00분 0초 2,017
2017년 2월 28일 10시 26분 11초 INFO ConsumerRecordOrWallclockTimestampExtractor : 21 - 소인 1,485,820,800,000 인간 readble -mon Jan 30 19:00:00 EST 2017
2017-02-28 012 3,516,10시 27분 22초 정보의 ConsumerRecordOrWallclockTimestampExtractor : 21 - TIMESTAMP : 1488295604411 인간 readble -Tue 2월 EST 2017

내 메세지로서 두 가지가 제거 얻을 볼 것으로 예상 그래서 Kafka's retention policy에 기반/삭제 (28) 10시 26분 44초 5 분 후 (1 월 1 일과 1 월 30 일 이후 두 번째 및 세 번째 메시지) 하지만 한 시간 동안 내 주제를 소비하려고 시도했는데 내 주제를 다 소비 할 때마다 4 개의 메시지를 모두 받았습니다.

카프카 - 브로 콘솔 소비자 --zookeeper 로컬 호스트 : 2181 --from-시작 --topic에 MyTopic을

내 카프카의 설정은 다음과 같이이다 :

############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 

암 내가 뭔가 잘못하고 있는거야, 아니면 여기서 뭔가 놓친거야?

답변

5

카프카는 로그 세그먼트를 삭제하여 보존 정책을 구현합니다. Kafka는 활성 세그먼트를 삭제하지 않습니다. 활성 세그먼트는 파티션에 전송 된 새 메시지를 추가하는 세그먼트입니다. Kafka는 오래된 세그먼트 만 삭제합니다.새 메시지가 파티션으로 전송되며, 어느 하나가

  • 새로운 메시지 능동 세그먼트의 크기 log.segment.bytes 또는
  • 제의 타임 스탬프를 초과 할 때 카프카는 이전 세그먼트에 활성 세그먼트 롤 활성 세그먼트의 메시지가 log.roll.ms보다 오래된 당신의 예에 따라서

(기본값은 7 일입니다), 당신이 화 2월 28일 동부 표준시 10시 18분 6초 2017 후 7 일을 기다려야 할, 새 메시지를 보내고, 4 개의 초기 메시지가 모두 삭제됩니다.

+0

그렇다면 5 분 후에 1970 (매우 오래된 메시지)의 타임 스탬프가 포함 된 두 개의 메시지를 보낸 경우 어떻게 삭제되는지 알 수 있습니까? – Am1rr3zA

관련 문제