나는 카프카 입력 다음과 같은 logstash 설정이Logstash
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["mytopic"]
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index"
codec => "json"
document_id => "%{id}"
doc_as_upsert => true
action => "update"
}
}
내가 직면하고 문제는 내가 logstash을 실행할 때 선택 나던이다 그 주제에 대한 오래된 메시지를 처음으로 logstash를 실행하면 소비되지 않은 주제에 대한 모든 메시지가 나타납니다. 나는 이것이 새로운 토픽이고 그것이 실행될 때 logstash에 의해 선택되지 않은 메시지를 가지고 있음을 확인했다. 그것이 실행 중일 때 주제에 들어오는 메시지를 받지만 시작하기 전에 있었던 메시지는 그렇지 않습니다. 내가 구성에서 뭔가를 놓치고 있는지 또는 입력 그 자체의 특이한 점입니까? 메시지 보장은 비즈니스 요구에 가장 중요합니다.
나는 그것을 시도했지만 나를 위해 일하지 않았다. – Fizi
auto_offset_reset 값은 Kafka가 소비자를위한 기존 그룹을 찾을 수 없을 때만 사용된다.이것이 작동하지 않는다면 기존 그룹이 있고 해당 그룹의 일부 소비자가 이미 메시지를 소비하고 오프셋을 커밋했음을 의미합니다. 내 대답을보세요. – oh54