2017-09-20 2 views
1

청취자 클래스에 ConsumerSeekAware을 구현 중입니다. 그러나 overridden 메소드 registerSeekCallback, onPartitionsAssigned, onIdleContainer는 결코 호출되지 않습니다. 아래는 내 소비자 구성 및 소비자 파일입니다. 실수를 식별 할 수 없습니다.스프링 카프카 -Consumer가 인식하지 못하는 메소드를 호출합니다.

Spring-kafka : 1.1.6 버전을 사용합니다.

소비자 :

package com.test.kafka.binder; 
    import java.util.Map; 
    import org.apache.kafka.common.TopicPartition; 
    import org.springframework.kafka.annotation.KafkaListener; 
    import org.springframework.kafka.listener.ConsumerSeekAware; 
    import org.springframework.stereotype.Component; 
    @Component 
    public class Consumer implements ConsumerSeekAware{ 

    @KafkaListener(topics = "${kafka.topic}", group = "foo") 
    public void listen(String message) { 
     System.out.println("Received Messasge in group foo: " + message); 
    } 

    @Override 
    public void registerSeekCallback(ConsumerSeekCallback callback) { 
     // TODO Auto-generated method stub 
     System.out.println("registerSeekCallback"); 

    } 

    @Override 
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { 
     // TODO Auto-generated method stub 
     System.out.println("onPartitionsAssigned"); 
    } 

    @Override 
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { 
     // TODO Auto-generated method stub 
     System.out.println("onIdleContainer"); 

    } 
} 

소비자 구성 :

package com.test.kafka.binder; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.ConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 

@EnableKafka 
@Configuration 
public class KafkaConsumerConfig { 


    @Value("${kafka.bootstrap-servers}") 
    private Object bootstrapAddress; 


    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      bootstrapAddress); 
     props.put(
      ConsumerConfig.GROUP_ID_CONFIG, 
      "test"); 
     props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
     props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> 
     kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory 
      = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 
} 
+0

의 전원을 켭니다; 당신이 그것을 알아낼 수 없다면, 로그를 어딘가에 게시하십시오. –

+0

@GrayRussell spring-kafka 디버그 로그에서 찾을 수 없습니다. 여기에 공유 된 로그 => http://text-share.com/view/a46d764a – Hariharan

답변

0

이 그것을 마스터에 고정되는 known problem이다. 1.1.7을 릴리스해야합니다.

그러나 1.2.1로 업그레이드 할 수있는 경우 이미 수정되었습니다. 당신은 또한 유휴 이벤트 활성화해야

참고 : DEBUG 로깅에

factory.getContainerProperties().setIdleEventInterval(10_000L); 
+0

내 업데이트도 참조하십시오. –

관련 문제