Spring Cloud Stream을 사용 중이며 프로그래밍 방식으로 채널을 만들고 바인딩하려고합니다. 나의 활용 사례는 응용 프로그램을 시작하는 동안 구독 할 Kafka 항목의 동적 목록을 수신한다는 것입니다. 각 주제에 대한 채널을 만들려면 어떻게해야합니까?Spring Cloud Stream 동적 채널
답변
수신 메시지의 경우 명시 적으로 BinderAwareChannelResolver
을 사용하여 대상을 동적으로 확인할 수 있습니다. 이것을 확인하실 수 있습니다 examplerouter
싱크 바인더 인식 채널 리졸버를 사용합니다.
나는 이해하지 못한다. 런타임에만 알고있는 사람들의 주제에 가입하고 싶다. 나는 메시지를 보내고 싶지 않다 . – Nikem
오, 미안해, 나는 오해했다. 동적 '대상 지원은 제작자 만 바인딩하는 것입니다.이 기능은 아직 다루지 않았으며 여기에 포함되어 있다고 생각합니다. https://github.com/spring-cloud/spring-cloud-stream/issues/746 –
Camel Spring Cloud Stream 구성 요소에 대해 비슷한 작업을 수행해야했습니다. 아마도 "채널 이름을 나타내는 String
"이라는 대상을 바인딩하는 소비자 코드가 유용할까요?
내 경우에는 하나의 대상 만 바인딩하지만 여러 대상에 대해서는 개념적으로 많이 다르다고 나는 상상하지 못합니다.
@Override
protected void doStart() throws Exception {
SubscribableChannel bindingTarget = createInputBindingTarget();
bindingTarget.subscribe(message -> {
// have your way with the received incoming message
});
endpoint.getBindingService().bindConsumer(bindingTarget,
endpoint.getDestination());
// at this point the binding is done
}
/**
* Create a {@link SubscribableChannel} and register in the
* {@link org.springframework.context.ApplicationContext}
*/
private SubscribableChannel createInputBindingTarget() {
SubscribableChannel channel = endpoint.getBindingTargetFactory()
.createInputChannel(endpoint.getDestination());
endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
endpoint.getDestination());
return channel;
}
더 컨텍스트에 대한 전체 소스 here를 참조하십시오
다음은 그것의 요점이다.
최근에 비슷한 시나리오가 발생했으며 아래에 SubscriberChannels를 동적으로 만드는 예제가 있습니다.
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setMaxAttempts(1);
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setConsumer(consumerProperties);
bindingProperties.setDestination(retryTopic);
bindingProperties.setGroup(consumerGroup);
bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
beanFactory.registerSingleton(consumerName, channel);
channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
bindingService.bindConsumer(channel, consumerName);
channel.subscribe(consumerMessageHandler);
- 1. Spring Cloud Stream + Quartz
- 2. Spring Cloud Stream RabbitMQ
- 3. Cloud Foundry에서 Spring Cloud Stream : Stream 애플리케이션에 RabbitMq 소스 바인딩하기
- 4. Spring Cloud Stream : ClassNotFoundException : org.springframework.integration.codec.kryo.KryoRegistrar
- 5. Spring Cloud Stream Dispatcher에는 구독자가 없습니다.
- 6. Spring Cloud Stream RabbitMQ가있는 수동 확인 ack
- 7. 하나의 애플리케이션으로서의 Spring Cloud Stream 플로우
- 8. Spring Cloud Stream JMS ActiveMQ를 만드는 방법
- 9. spring-cloud-stream-kafka 구성 : instanceCount & instanceIndex
- 10. Spring Cloud Stream Kafka 바인더 압축
- 11. Spring Cloud Stream : Sink에서 RabbitMQ 연결이 끊어졌습니다.
- 12. spring-cloud-stream 프로듀서 트랜잭션 성
- 13. Spring Cloud Stream - 메시지 일괄 처리
- 14. java 인터페이스가없는 Spring Cloud 메시지 채널
- 15. "Spring Cloud Stream"대신 "Spring Rabbit"을 사용해야합니까?
- 16. Spring Cloud Stream Rabbit에서 수동 응답을 관리하는 방법은 무엇입니까?
- 17. Spring Integration : Returning stream
- 18. Spring Cloud Stream 프로젝트에 웹 서버 (Tomcat)가 필요합니까?
- 19. Spring Cloud Dataflow - Stream Apps Env 변수 설정
- 20. spring-cloud-stream 기반의 마이크로 서비스에서 python 스크립트를 호출하는 방법
- 21. spring-cloud-starter-stream-kafka는 spring.boot.cloud.stream.bindings.output.destination에 제공된 주제를 생성하지 않습니다.
- 22. Spring Cloud Stream Processor 단일 입력, 다중 행 출력
- 23. Kafka 브로커가 Zookeeper에 연결할 수없는 Dockerized Spring Cloud Stream 서비스
- 24. Spring Cloud Stream - 여러 소스가 단일 스프링 부트 프로젝트입니다.
- 25. spring-cloud-stream-schema 클라이언트; 쉽게 서버 포트를 설정하는 방법?
- 26. Spring Cloud Stream Ribbit MQ에 자동으로 다시 연결
- 27. 어떤 공식적인 Spring Cloud Stream Apps 소스 저장소입니까?
- 28. spring-cloud-starter-stream-source-file 처리 후 파일을 삭제하십시오.
- 29. spring cloud stream kafka : 'input'에 대해 중복 된 @StreamListener 매핑
- 30. spring-cloud-starter-stream-source-jdbc 샘플 애플리케이션?
당신은 여기에 비슷한 질문이 답변을 확인할 수 있습니다 http://stackoverflow.com/questions/40485421/spring-cloud-stream-to-support-routing-messages-dynamically –
대답은 발신위한 것임을을 메시지. 나는 들어오는 것들이 필요하다 : ( – Nikem