저는 우리가 가지고있는 데이터 재생을 구현하고 싶었습니다. 그 때문에 Kafka 보존 정책을 사용해야합니다. (조인을 사용하고 있으므로 정확한 시간이 필요합니다.). 오후 8시 30 분 P.S. 나는이 같은 주제에 내 데이터를 전송 카프카 버전 0.10.1.1Kafka 보유 정책이 예상대로 작동하지 않습니다.
을 사용하고 있습니다 :
kafkaProducer.send(
new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
);
그리고 나는이 같은 내 주제를 만듭니다
카프카 - 주제 --create - -zookeeper localhost : 2181 --replication-factor 1 --partitions 1 --topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics - zookeeper localhost - alter --topic myTopic --config segment.ms = 172800000
위의 설정에 따라 주제의 보유 시간을 48 시간으로 설정해야합니다.
나는 각 메시지의 실제 시간을 기록하기 위해 TimestampExtractor
까지 확장합니다.
public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord) {
LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
}
}
테스트를 위해 내 주제에 4 개의 메시지를 보냈고이 4 개의 로그 메시지를 받았습니다.
2017년 2월 28일 10시 23분 39초 INFO ConsumerRecordOrWallclockTimestampExtractor : 21 - 소인 1,488,295,086,292 인간 readble -Tue 이월 28 동부 표준시 10시 18분 6초 2,017
2017년 2월 28일 10시 24분 01 INFO ConsumerRecordOrWallclockTimestampExtractor : 21 - 소인 1,483,272,000,000 인간 readble - 일 일월 01 동부 표준시 07시 00분 0초 2,017
2017년 2월 28일 10시 26분 11초 INFO ConsumerRecordOrWallclockTimestampExtractor : 21 - 소인 1,485,820,800,000 인간 readble -mon Jan 30 19:00:00 EST 2017
2017-02-28 012 3,516,10시 27분 22초 정보의 ConsumerRecordOrWallclockTimestampExtractor : 21 - TIMESTAMP : 1488295604411 인간 readble -Tue 2월 EST 2017
내 메세지로서 두 가지가 제거 얻을 볼 것으로 예상 그래서 Kafka's retention policy에 기반/삭제 (28) 10시 26분 44초 5 분 후 (1 월 1 일과 1 월 30 일 이후 두 번째 및 세 번째 메시지) 하지만 한 시간 동안 내 주제를 소비하려고 시도했는데 내 주제를 다 소비 할 때마다 4 개의 메시지를 모두 받았습니다.
카프카 - 브로 콘솔 소비자 --zookeeper 로컬 호스트 : 2181 --from-시작 --topic에 MyTopic을
내 카프카의 설정은 다음과 같이이다 :
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
암 내가 뭔가 잘못하고 있는거야, 아니면 여기서 뭔가 놓친거야?
그렇다면 5 분 후에 1970 (매우 오래된 메시지)의 타임 스탬프가 포함 된 두 개의 메시지를 보낸 경우 어떻게 삭제되는지 알 수 있습니까? – Am1rr3zA