2017-01-16 1 views
1

Spring Cloud Stream을 사용 중이며 프로그래밍 방식으로 채널을 만들고 바인딩하려고합니다. 나의 활용 사례는 응용 프로그램을 시작하는 동안 구독 할 Kafka 항목의 동적 목록을 수신한다는 것입니다. 각 주제에 대한 채널을 만들려면 어떻게해야합니까?Spring Cloud Stream 동적 채널

+0

당신은 여기에 비슷한 질문이 답변을 확인할 수 있습니다 http://stackoverflow.com/questions/40485421/spring-cloud-stream-to-support-routing-messages-dynamically –

+0

대답은 발신위한 것임을을 메시지. 나는 들어오는 것들이 필요하다 : ( – Nikem

답변

0

수신 메시지의 경우 명시 적으로 BinderAwareChannelResolver을 사용하여 대상을 동적으로 확인할 수 있습니다. 이것을 확인하실 수 있습니다 examplerouter 싱크 바인더 인식 채널 리졸버를 사용합니다.

+0

나는 이해하지 못한다. 런타임에만 알고있는 사람들의 주제에 가입하고 싶다. 나는 메시지를 보내고 싶지 않다 . – Nikem

+0

오, 미안해, 나는 오해했다. 동적 '대상 지원은 제작자 만 바인딩하는 것입니다.이 기능은 아직 다루지 않았으며 여기에 포함되어 있다고 생각합니다. https://github.com/spring-cloud/spring-cloud-stream/issues/746 –

0

Camel Spring Cloud Stream 구성 요소에 대해 비슷한 작업을 수행해야했습니다. 아마도 "채널 이름을 나타내는 String"이라는 대상을 바인딩하는 소비자 코드가 유용할까요?

내 경우에는 하나의 대상 만 바인딩하지만 여러 대상에 대해서는 개념적으로 많이 다르다고 나는 상상하지 못합니다.

@Override 
    protected void doStart() throws Exception { 
     SubscribableChannel bindingTarget = createInputBindingTarget(); 
     bindingTarget.subscribe(message -> { 
      // have your way with the received incoming message 
     }); 

     endpoint.getBindingService().bindConsumer(bindingTarget, 
       endpoint.getDestination()); 

     // at this point the binding is done 
    } 

    /** 
    * Create a {@link SubscribableChannel} and register in the 
    * {@link org.springframework.context.ApplicationContext} 
    */ 
    private SubscribableChannel createInputBindingTarget() { 
     SubscribableChannel channel = endpoint.getBindingTargetFactory() 
       .createInputChannel(endpoint.getDestination()); 
     endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel); 
     channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel, 
       endpoint.getDestination()); 
     return channel; 
    } 

더 컨텍스트에 대한 전체 소스 here를 참조하십시오

다음은 그것의 요점이다.

0

최근에 비슷한 시나리오가 발생했으며 아래에 SubscriberChannels를 동적으로 만드는 예제가 있습니다.

ConsumerProperties consumerProperties = new ConsumerProperties(); 
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties(); 
    bindingProperties.setConsumer(consumerProperties); 
    bindingProperties.setDestination(retryTopic); 
    bindingProperties.setGroup(consumerGroup); 

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties); 
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName); 
    beanFactory.registerSingleton(consumerName, channel); 
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName); 
    bindingService.bindConsumer(channel, consumerName); 
    channel.subscribe(consumerMessageHandler); 
관련 문제