0

저는 프로덕션 서버에 Kafka 0.10.0 및 zookeeper 3.4.6을 사용하고 있습니다. 각각 약 50 개의 파티션으로 20 개의 주제가 있습니다. 각기 다른 주제와 파티션에 가입 한 총 100 명의 소비자가 있습니다. 모든 소비자는 동일한 그룹 ID를 사용하고 있습니다. 소비자가 특정 주제에 대해 추가되거나 제거되는 경우 다른 주제에 연결된 소비자도 재조정을 받게됩니까?Kafka 소비자 그룹 ID 및 소비자 균형 조정 문제

내 소비자 코드는 다음과 같습니다

public static void main(String[] args) { 
     String groupId = "prod" 
     String topicRegex = args[0] 
     String consumerTimeOut = "10000" 
     int n_threads = 1 
     if (args && args.size() > 1) { 
      ConfigLoader.init(args[1]) 
     } 
     else { 
      ConfigLoader.init('development') 
     } 
     if(args && args.size() > 2 && args[2].isInteger()){ 
      n_threads = (args[2]).toInteger() 
     } 

     ExecutorService executor = Executors.newFixedThreadPool(n_threads) 
     addShutdownHook(executor) 
     String zooKeeper = ConfigLoader.conf.zookeeper.hostName 
     List<Runnable> taskList = [] 
     for(int i = 0; i < n_threads; i++){ 
      KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topicRegex, consumerTimeOut) 
      taskList.add(example) 
     } 
     taskList.each{ task -> 
      executor.submit(task) 
     } 
     executor.shutdown() 
     executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) 
    } 

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId, String consumerTimeOut) { 

     Properties props = new Properties() 
     props.put("zookeeper.connect", a_zookeeper) 
     props.put("group.id", a_groupId) 
     props.put("zookeeper.session.timeout.ms", "10000") 
     props.put("rebalance.backoff.ms","10000") 
     props.put("zookeeper.sync.time.ms","200") 
     props.put("rebalance.max.retries","10") 
     props.put("enable.auto.commit", "false") 
     props.put("consumer.timeout.ms", consumerTimeOut) 
     props.put("auto.offset.reset", "smallest") 
     return new ConsumerConfig(props) 

    } 

public void run(String topicRegex) { 
     String threadName = Thread.currentThread().getName() 
     logger.info("{} [{}] main Starting", TAG, threadName) 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>() 
     List<KafkaStream<byte[], byte[]>> streams = consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1) 
     ConsumerConnector consumerConnector = consumer 

     for (final KafkaStream stream : streams) { 
      ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator() 
      List<Object> batchTypeObjList = [] 
      String topic 
      String topicObjectType 
      String method 
      String className 
      String deserialzer 
      Integer batchSize = 200 
      while (true){ 
       boolean hasNext = false 
       try { 
        hasNext = consumerIterator.hasNext() 
       } catch (InterruptedException interruptedException) { 
        //if (exception instanceof InterruptedException) { 
        logger.error("{} [{}]Interrupted Exception: {}", TAG, threadName, interruptedException.getMessage()) 
        throw interruptedException 
        //} else { 
       } catch(ConsumerTimeoutException timeoutException){ 
        logger.error("{} [{}] Timeout Exception: {}", TAG, threadName, timeoutException.getMessage()) 
        topicListMap.each{ eachTopic, value -> 
         batchTypeObjList = topicListMap.get(eachTopic) 
         if(batchTypeObjList != null && !batchTypeObjList.isEmpty()) { 
          def dbObject = topicConfigMap.get(eachTopic) 
          logger.debug("{} [{}] Timeout Happened.. Indexing remaining objects in list for topic: {}", TAG, threadName, eachTopic) 
          className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
          method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
          int sleepTime = 0 
          if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null) 
           sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger() 
          executeMethod(className, method, batchTypeObjList) 
          batchTypeObjList.clear() 
          topicListMap.put(eachTopic,batchTypeObjList) 
          sleep(sleepTime) 
         } 
        } 
        consumer.commitOffsets() 
        continue 
       } catch(Exception exception){ 
        logger.error("{} [{}]Exception: {}", TAG, threadName, exception.getMessage()) 
        throw exception 
       } 
       if(hasNext) { 
        def consumerObj = consumerIterator.next() 
        logger.debug("{} [{}] partition name: {}", TAG, threadName, consumerObj.partition()) 
        topic = consumerObj.topic() 
        DBObject dbObject = topicConfigMap.get(topic) 
        logger.debug("{} [{}] topic name: {}", TAG, threadName, topic) 
        topicObjectType = dbObject.get(KafkaTopicConfigEntity.TOPIC_OBJECT_TYPE_KEY) 
        deserialzer = KafkaConfig.DEFAULT_DESERIALIZER 
        if(KafkaConfig.DESERIALIZER_MAP.containsKey(topicObjectType)){ 
         deserialzer = KafkaConfig.DESERIALIZER_MAP.get(topicObjectType) 
        } 
        className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
        method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
        boolean isBatchJob = dbObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY) 
        if(dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) != null) 
         batchSize = dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) 
        else 
         batchSize = 1 
        Object queueObj = (Class.forName(deserialzer)).deserialize(consumerObj.message()) 
        int sleepTime = 0 
        if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null) 
         sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger() 
        if(isBatchJob == true){ 
         batchTypeObjList = topicListMap.get(topic) 
         batchTypeObjList.add(queueObj) 
         if(batchTypeObjList.size() == batchSize) { 
          executeMethod(className, method, batchTypeObjList) 
          batchTypeObjList.clear() 
          sleep(sleepTime) 
         } 
         topicListMap.put(topic,batchTypeObjList) 
        } else { 
         executeMethod(className, method, queueObj) 
         sleep(sleepTime) 
        } 
        consumer.commitOffsets() 
       } 
      } 
      logger.debug("{} [{}] Shutting Down Process ", TAG, threadName) 
     } 
    } 

어떤 도움이 appriciated됩니다.

+0

예. 모든 소비자 인스턴스가 동일한 그룹 ID를 공유하면 소비자 수 변경으로 인해 그룹에 대한 재조정이 트리거됩니다. – amethystic

답변

2

소비자가 소비자 그룹을 떠날 때나 가입 할 때마다 전체 그룹이 다시 균형을 조정합니다. 그룹은 회원들이 가입 한 모든 주제의 모든 파티션을 추적하기 때문에 사고가 옳다는 점에서 문제의 주제에 가입하지 않은 소비자의 균형을 다시 잡을 수 있습니다.

이 지점을 설명하는 작은 테스트는 아래를 참조하십시오. 두 개의 주제 test1 (2 개의 파티션)과 test2 (9 개의 파티션)가있는 중개인이 있고 두 소비자가 모두 같은 소비자 그룹으로 시작하고 각자 만 두 가지 주제 중 하나. 보시다시피, consumer2가 그룹에 가입하면 전체 그룹이 다시 균형을 조정하기 때문에 consumer1은 모든 파티션을 취소하고 재 할당합니다.

Subscribing consumer1 to topic test1 
Starting thread for consumer1 
Polling consumer1 
consumer1 got 0 partitions revoked! 
consumer1 got 2 partitions assigned! 
Polling consumer1 
Polling consumer1 
Polling consumer1 
Subscribing consumer2 to topic test2 
Starting thread for consumer2 
Polling consumer2 
Polling consumer1 
consumer2 got 0 partitions revoked! 
Polling consumer1 
Polling consumer1 
consumer1 got 2 partitions revoked! 
consumer2 got 9 partitions assigned! 
consumer1 got 2 partitions assigned! 
Polling consumer2 
Polling consumer1 
Polling consumer2 
Polling consumer1 
Polling consumer2 
관련 문제