2017-02-03 4 views
2

나는 카프카 입력 다음과 같은 logstash 설정이Logstash

input { 
    kafka { 
    bootstrap_servers => "localhost:9092" 
    topics => ["mytopic"] 
    } 
} 
filter { 
    json { 
    source => "message" 
    } 
} 
output { 
    stdout { 
    codec => rubydebug 
    } 
    elasticsearch { 
    hosts => ["localhost:9200"] 
    index => "my_index" 
    codec => "json" 
    document_id => "%{id}" 
    doc_as_upsert => true 
    action => "update" 
    } 
} 

내가 직면하고 문제는 내가 logstash을 실행할 때 선택 나던이다 그 주제에 대한 오래된 메시지를 처음으로 logstash를 실행하면 소비되지 않은 주제에 대한 모든 메시지가 나타납니다. 나는 이것이 새로운 토픽이고 그것이 실행될 때 logstash에 의해 선택되지 않은 메시지를 가지고 있음을 확인했다. 그것이 실행 중일 때 주제에 들어오는 메시지를 받지만 시작하기 전에 있었던 메시지는 그렇지 않습니다. 내가 구성에서 뭔가를 놓치고 있는지 또는 입력 그 자체의 특이한 점입니까? 메시지 보장은 비즈니스 요구에 가장 중요합니다.

답변

-1

kafka 입력 플러그인 설정 auto_offset_reset을 "가장 빠름"으로 설정해야합니다. 당신은 카프카에 대한 그룹 ID를 지정하지 않은 때문에

input { 
    kafka { 
    bootstrap_servers => "localhost:9092" 
    auto_offset_reset => "earliest" 
    topics => ["mytopic"] 
    } 
} 
+0

나는 그것을 시도했지만 나를 위해 일하지 않았다. – Fizi

+1

auto_offset_reset 값은 Kafka가 소비자를위한 기존 그룹을 찾을 수 없을 때만 사용된다.이것이 작동하지 않는다면 기존 그룹이 있고 해당 그룹의 일부 소비자가 이미 메시지를 소비하고 오프셋을 커밋했음을 의미합니다. 내 대답을보세요. – oh54

4

의 중요한 대화 고려 사항은 다음과 같습니다

  • 카프카의 group.id (logstash의 카프카 구성 GROUP_ID는) logstash의 기본으로 설정되어
      logstash에서, 즉 "logstash"enable.auto.commit (enable_auto_commit)에 대한
    • 기본 카프카 값은
    • 카프카의 auto.offset.reset (auto_offset_reset는) 내가 생각 logstash에 기본 값이없는 "참" 라의 카프카 기본값 테스트가 사용됩니다.

    그래서 당신은 어떤 주제에 소비자를 실행하고 두 가지 중 하나가 가능성이 일어나고, 주제에 이미 메시지를 데리러 실패 할 때 :

    1. 는 같은과 기존 그룹이 없습니다 group id를 소비자로 지정하면 Kafka의 기본 auto.offset.reset 값인 latest 값이 사용되며 소비자는 이미 존재하는 메시지를 무시합니다.
    2. 동일한 그룹 ID ("logstash")를 가진 기존 그룹이 있으며이 그룹 ID를 가진 일부 사용자는 이미 기존 메시지를 소비하고 오프셋을 커밋했습니다 (이 다른 소비자는 이전에 또는 일부 사용자가 실행했을 수도 있음). 동일한 그룹 ID를 가진 다른 소비자). 즉,이 그룹의 다른 소비자는 어떻게 든 명시 적으로 그렇게하지 않는 한 해당 메시지를 다시 소비하지 않습니다.

    그래서 당신은 가능성이 logstash에 대해 사용자가 설정 할 수 있어야한다, 어떤 카프카의 configration을 설정하고 싶은 것을

    GROUP_ID => "some_random_group"

    auto_offset_reset => "초기"

    소비자를 지금 실행하면 some_random_group에 대한 기존 오프셋이없고 재설정이 가장 빠르기 때문에 소비자는 주제의 기존 메시지를 모두 소비해야합니다 그리고 오프셋을 커밋합니다. 즉, 사용자가 다시 실행 한 모든 메시지를 사용한 후에도 기존 메시지를 소비하지 않습니다.

  • +0

    감사합니다. 이것은 많은 의미가 있습니다. 우리는 여러 소비자가 있지만 다른 주제에 대한 기계에서 실행했습니다. group_id가 지정되지 않았습니다 (기본값은 logstash입니다). 그것은 그 역할을했을까요? 생산에서 나는 오직 한 소비자를 가지려고하지만 dev에 누구나 소비자를 시작할 수 있으므로 모든 소비자들이 고유 한 그룹 ID를 가질 필요가 있는지 궁금합니다. – Fizi

    +0

    여러 소비자에 대해 동일한 그룹 ID를 지정한 경우, 즉 해당 소비자 그룹을 구성하는 경우 모두 대략 동일한 양의 메시지가 표시됩니다. 생성하는 동안 동일한 그룹 ID를 사용하는 소비자가있는 경우 메시지가 전송됩니다. dev에서는 테스트가 다른 사람들을 간섭하는 것을 원하지 않으면 모든 사람들이 다른 그룹 ID를 사용하도록 제안합니다. 반대의 경우도 마찬가지입니다. – oh54

    +0

    문제가 해결 되었습니까? – oh54