2016-09-08 2 views
3

스프링 클라우드 스트림 소스 빈을 스프링 부트 애플리케이션에 작성하려고합니다. 메소드의 결과를 스트림으로 보냅니다 (기본 카프카 토픽은 흐름).스프링 클라우드 스트림 소스를 사용하여 메소드 결과를 스트림으로 보내기

내가 본 스트림 샘플의 대부분은 @InboundChannelAdapter 주석을 사용하여 폴러를 사용하여 스트림으로 데이터를 보냅니다. 하지만 폴러를 사용하고 싶지는 않습니다. 폴러를 빈 배열로 설정하려고 시도했지만 다른 문제는 @InboundChannelAdapter를 사용할 때 메서드 매개 변수를 사용할 수 없다는 것입니다.

내가하려는 일의 전반적인 개념은 인바운드 스트림에서 읽는 것입니다. 비동기 처리를 수행 한 다음 결과를 아웃 바운드 스트림에 게시하십시오. 따라서 프로세서를 사용하는 것도 옵션이 아닙니다. Sink 채널과 함께 @StreamListener을 사용하여 인바운드 스트림을 읽었으며 작동합니다.

여기 제가 시도한 코드가 있지만 이것은 전혀 작동하지 않습니다. 내 싱크가 어쩌면 그랬을 수도 있기 때문에 그렇게 간단한 것이기를 바랬습니다. 프로세서가 아닌 소스 (예 : 인바운드 채널에서 수신 할 필요가 없음)의 예를 가리키는 누군가를 찾고 @InboundChannelAdapter을 사용하지 않거나 내가해야 할 일을 수행하기위한 디자인 팁을 제공하지 않습니다. 다른 방법으로. 감사! 당신은 폴러를 사용하지 않으려면

@EnableBinding(Source.class) 
public class JobForwarder { 

    @ServiceActivator(outputChannel = Source.OUTPUT) 
    @SendTo(Source.OUTPUT) 
    public String forwardJob(String message) { 
     log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT)); 
     return message; 
    } 
} 

답변

0

그래서, 무엇이 forwardJob() 메소드가 호출되도록한다?

메서드를 호출하고 결과를 출력 채널로 보낼 수는 없습니다.

현재 구성에서는 인바운드 메시지가 포함 된 서비스 (해당 채널에 메시지를 보낼 항목)가 포함 된 inputChannel이 필요합니다. 운송 수단에 구속 될 필요는 없습니다. 간단한 MessageChannel@Bean 일 수 있습니다.

또는 @Publisher을 사용하여 메서드 호출 결과를 게시하고 (호출자에게 반환되는 경우) docs here 수 있습니다.

@Publisher(channel = Source.OUTPUT) 
0

감사합니다. 문제로 돌아 가기까지 어느 정도 시간이 걸렸습니다. @Publisher에 대한 설명서를 읽으려고했습니다. 그것은 내가 필요로했던 것처럼 보였습니다.하지만 제대로 된 bean을 초기화하기 위해 초기화 된 적절한 bean을 얻을 수 없었습니다.

forwardJob() 메서드는 입력의 일부 비동기 처리 후에 호출됩니다.

결국 나는 방금 spring-kafka 라이브러리를 사용하여 구현했으며 훨씬 더 명확하고 쉽게 갈 수 있다고 느꼈습니다. 나는 카프카를 유일한 채널 바인딩으로 고수 할 것이므로 그 라이브러리를 계속 사용할 것이라고 생각합니다.

그러나 스프링 클라우드 스트림 라이브러리는 결국 간단하게 작동합니다. 폴러가없는 단일 소스에 대한 코드가 있습니다.

@Component 
@EnableBinding(Source.class) 
public class JobForwarder { 

    private Source source; 

    @Autowired 
    public ScheduledJobForwarder(Source source) { 
     this.source = source; 
    } 

    public void forwardScheduledJob(String message) { 
     log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT)); 
     source.output().send(MessageBuilder.withPayload(message).build()); 
    } 
} 
3

기본 요구 사항은 아래 단계를 통해 얻을 수 있습니다.

  1. 사용자 정의 바인딩 인터페이스를 만들기 바운드 채널

    @Component 
    @EnableBinding(CustomSource.class) 
    public class CustomOutputEventSource { 
    
        @Autowired 
        private CustomSource customSource; 
    
        public void sendMessage(String message) { 
         customSource.output().send(MessageBuilder.withPayload(message).build()); 
        } 
    } 
    
  2. 테스트를 주입

    public interface CustomSource { 
        String OUTPUT = "customoutput"; 
    
        @Output(CustomSource.OUTPUT) 
        MessageChannel output(); 
    } 
    
  3. (당신은뿐만 아니라 기본 @EnableBinding(Source.class)을 사용할 수 있습니다) 그

    @RunWith(SpringRunner.class) 
    @SpringBootTest 
    public class CustomOutputEventSourceTest { 
    
        @Autowired 
        CustomOutputEventSource output; 
    
        @Test 
        public void sendMessage() { 
         output.sendMessage("Test message from JUnit test"); 
        } 
    } 
    
관련 문제