2016-09-19 4 views
4

미래에 크기가 커질 수있는 주제 목록 (현재는 10입니다)이 있습니다. 각 주제에서 소비 할 여러 스레드 (주제 당)를 생성 할 수 있지만 주제가 많으면 주제에서 소비하는 스레드 수가 늘어납니다. 주제가 많지 않으므로 원하지 않습니다. 너무 자주 데이터를 얻으므로 쓰레드가 이상적입니다.여러 주제에 대한 카프카 소비자

단일 소비자가 모든 주제에서 소비 할 수있는 방법이 있습니까? 그렇다면 우리는 어떻게 그것을 성취 할 수 있습니까? 또한 카프카가 오프셋을 어떻게 유지할 것인가? 답변을 제안하십시오.

소비자는 주제의 정보를 가지고 있으며, 우리가 consumer.commitAsync 또는 소비자를 사용하여 COMIT 수 consumer.subscribe (Arrays.asList (항목 1, 항목 2), ConsumerRebalanceListener의 OBJ) : 우리는 다음과 같은 API를 사용하여 여러 주제를 구독 할 수 있습니다

답변

4

다음과 같이 OffsetAndMetadata 객체를 생성하여 .commitSync()를 호출합니다.

ConsumerRecords<String, String> records = consumer.poll(long value); 
for (TopicPartition partition : records.partitions()) { 
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
     System.out.println(record.offset() + ": " + record.value()); 
    } 
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
} 
+0

나는 알 수있다. 그러나 어떻게 오프셋이 카프카에 의해 유지 될 것인가? 또한, 단일 소비자 그룹을 갖는 것이 내 문제를 해결할 것입니까? – Apollo

+1

오프셋은 앱에 의해 커밋되고 __consumer_offsets라는 특수 오프셋 카프카 항목에 저장됩니다. 오프셋은 각 주제의 각 파티션마다 유지되므로 가입 한 주제 수에 관계없이 중요합니다. –

1

여러 스레드가 필요하지 않으므로 여러 주제에서 소모하여 하나의 소비자를 보유 할 수 있습니다. kafka-server 자체는 상태 비 저장이므로 오프셋은 동물원 관리자가 관리합니다. 소비자가 메시지를 소비 할 때마다 이후 트랙에서 각 메시지를 한 번만 처리하도록 오프셋을 사육사에게 위탁합니다. 따라서 카프카 (kafka) 오류가 발생하더라도 소비자는 마지막으로 커밋 된 오프셋의 다음 단계부터 소비하기 시작합니다.

+1

Kafka 0.9 이상부터 오프셋은 사육사 대신 Kafka 주제에 저장됩니다. –

관련 문제