2017-04-08 3 views
3

실험용 카프카 환경을 3 개의 브로커와 3 개의 파티션이있는 주제로 설정했습니다. 제작자와 소비자가 있습니다. 특정 소비자를위한 파티션의 오프셋을 수정하고 싶습니다. kafka 문서에서 소비자 커밋/페치 API 인 kafka가 특정 오프셋을 커밋하거나 소비자가 읽은 최신 오프셋을 가져올 수 있음을 읽었습니다. 나는 특정 소비자에서 오프셋을 가져 오기 위해 내 코드를 작성하기 위해 아래 페이지에서 코드를 사용한kafka 소비자 반입 API가 올바른 오프셋 값을 반환하지 않습니다.

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

: 여기 API에 대한 링크입니다. 그러나 fetch API는 요청 된 오프셋에 대해 "-1"값을 반환합니다.
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

나는 또한 첫 번째 링크에서 읽은 어떤 브로커가 (오류 코드를 설정하지 않는 소비자 그룹에서 주제 파티션과 관련이 상쇄되는 경우 는 "이 없기 때문에 것을 : 여기 예제 코드는 실제로 오류가 아님), 빈 메타 데이터를 반환하고 오프셋 필드를 -1로 설정합니다. "

그러나 일부 메시지를 생성 했으므로 소비자가 메시지를 소비하고 각 읽음 메시지에 대한 오프셋을 출력했습니다.

누구든지 도움을받을 수 있다면 정말 감사 할 것입니다. 내 코드의 어느 부분이 잘못된 것인지 알고 싶습니다. 아니면 API에 문제가있을 수 있습니다. 유용한 의견을 말하기를 주저하지 마십시오. 내 코드는 내가 제공 한 링크의 코드와 정확히 같습니다. 그러나 만약 당신이 내 코드를 볼 필요가 여기에 넣어 말해.

중개인 : 1 : 포트 9093

브로커 2 : 포트 9094

브로커 3 : 포트 9095

카프카 버전은 내 카프카의 설정은 0.10.2.0

입니다

주제 : "testpic3"

.......... ............

소비자 구성은 :

props.put("group.id", "test"); 

props.put("client.id", "MyConsumer"); 

................ 여기

내 코드입니다 :

public class KafkaOffsetManage { 

public static void main(String[] args) { 


    BlockingChannel channel = new BlockingChannel("localhost", 9095, 
      BlockingChannel.UseDefaultBufferSize(), 
      BlockingChannel.UseDefaultBufferSize(), 
      5000 /* read timeout in millis */); 
    channel.connect(); 
    final String MY_GROUP = "test"; 
    final String MY_CLIENTID = "MyConsumer"; 
    int correlationId = 0; 
    final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0); 
    final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1); 
    final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2); 
    channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID)); 
    ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); 
    System.out.println("+++++++++++++++++++++++++++"); 

    System.out.println(metadataResponse.errorCode()); 

    if (metadataResponse.errorCode() == ErrorMapping.NoError()) { 
     Broker offsetManager = metadataResponse.coordinator(); 
     // if the coordinator is different, from the above channel's host then reconnect 
     channel.disconnect(); 
     channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), 
       BlockingChannel.UseDefaultBufferSize(), 
       BlockingChannel.UseDefaultBufferSize(), 
       5000 /* read timeout in millis */); 
     channel.connect(); 
     System.out.println("Connected to Offset Manager"); 
     System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port()); 

    } else { 
     // retry (after backoff) 
    } 



    // How to fetch offsets 


    List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); 
    partitions.add(testPartition0); 
    //partitions.add(testPartition1); 
    OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
      MY_GROUP, 
      partitions, 
      (short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper 
      correlationId, 
      MY_CLIENTID); 
    try { 
     channel.send(fetchRequest.underlying()); 
     OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); 
     OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0); 

     short offsetFetchErrorCode = result.error(); 
     if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { 
      channel.disconnect(); 
      // Go to step 1 and retry the offset fetch 
     } else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) { 
      // retry the offset fetch (after backoff) 
     } else { 
      long retrievedOffset = result.offset(); 
      String retrievedMetadata = result.metadata(); 
      System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset)); 
      System.out.println(retrievedMetadata); 
      System.out.println(result.toString()); 
     } 
    } 
    catch (Exception e) { 
     channel.disconnect(); 
     // Go to step 1 and then retry offset fetch after backoff 
    } 
} 
} 

코드의 출력은 여기에 있습니다 :

+++++++++++++++++++++++++++ 
0 

Connected to Offset Manager 

user-virtual-machine, Port:9093 
------------------------ 
The retrieved offset is:-1 

OffsetMetadataAndError[-1,,3] 

Process finished with exit code 0 

하나 이상한 것은 내가 Kafka 의존성에 관한 것입니다. 나는이 종속성을 추가 할 때, 내 코드는 프로그램의 일부 클래스를 인식하지 않습니다 "ConsumerMetadataRequest"와 "ConsumerMetadataResponse"는 인식되지 않습니다

<artifactId>kafka_2.10</artifactId> 
<version>0.10.2.0</version> 

클래스를.

그래서 내가 대신이 종속성을 추가 :
<artifactId>kafka_2.10</artifactId> 
<version>0.8.2.0</version> 

감사합니다,

+0

코드의 관련 부분을 제공하십시오. 포함 된 링크가 어느 시점에서 작동을 멈추고 질문이 쓸모 없게 될 수도 있습니다. –

+0

Patrick 대단히 감사합니다. 여기에 내 코드가있다 – Farhad9660

답변

0

을 당신이 당신의 종속성으로

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.10.2.0</version> 
</dependency> 

을 추가 한 가정합니다. 이것은 카프카 그 자체입니다. 당신이 카프카 0.10.2에 생산/소모에 필요한 것은 : 소모 (주어진 소비자의 오프셋 (offset) 조작)에 대한

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.10.2.0</version> 
</dependency> 

클래스는 자바 독을 설명하고 Committing and fetching consumer offsets in Kafka보다 편리하고있다 KafkaConsumer를 사용합니다. 그 외에도

, 당신은 여전히 ​​당신이 할 수있는 당신이 링크 된 예에서 코드 문제를 사용하려면 : 당신은 하나 개의 파티션을 추가 할 가능성이 파티션에 있다는 것을이

List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); 
    partitions.add(testPartition0); 

을 메시지가 없습니다 (3 개의 파티션이있어서 보내시는 메시지가 다른 2 개의 메시지로 이어질 수 있습니다). 카프카에서는 각 파티션이 분리되어 있으며 소비자 그룹마다 각 파티션별로 다른 오프셋이 있습니다.

+0

친애하는 reynev, 나는 kafka 클라이언트에 대한 종속성을 추가했습니다. 그러나 문제는 여전히 존재합니다. 그럼에도 불구하고 약 종속성에 대한 포인트가있을 수 있습니다. 하나의 이상한 것은 Kafka 의존성에 관한 것입니다. kafka_2.10 0.10.2.0 위의 종속성을 추가하면 내 코드가 프로그램의 일부 클래스를 인식하지 못합니다. 클래스 : ConsumerMetadataRequest 및 ConsumerMetadataResponse가 인식되지 않습니다. 그래서이 종속성을 대신 추가했습니다 : kafka_2.10 0.8.2.0 Farhad9660

+0

이 코드는이 종속성과 잘 작동하지만 출력이 올바르지 않습니다. 기본 문제가 종속성인지 확실하지 않습니다. 어쨌든 kafka 버전 0.10.2.0에 대한 종속성을 추가 할 수 없습니다. 또한, 3 개의 파티션에 메시지를 인쇄했기 때문에 파티션에 아무런 문제가 없습니다. 나는 partitions.add (testPartition1)와 partitions.add (testPartition2)도 시도했다. – Farhad9660

+0

내 조언은'kadka_2.10'에서 자바 API 인'kafka-clients'를 카프카를 사용하고자하는 응용 프로그램으로 변경하는 것입니다. 예를 들어 코드를 작성해 드리겠습니다 – reynev

관련 문제