2017-09-18 1 views
1

처리 할 사람을 보내고 모든 오류를 수집하고 잭슨을 사용하여 직렬화하는 샘플 응용 프로그램을 만들려고합니다.프로세서와 싱크 사이의 메시지 변환

그러나 내 싱크는 프로세서에서 수신 한 메시지를 변환하려고 시도하지 않습니다. application/json 대신에 다른 contentType으로 메시지를받는 것처럼 보입니다. 이 기능을 사용하려면 어떻게해야합니까? 나는 봄 클라우드 Dalston.SR3

홈페이지를 사용하고

:

@SpringBootApplication 
public class PersonStreamApplication { 

    @Bean 
    public MessageConverter customMessageConverter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule())); 
     return converter; 
    } 

    public static void main(String[] args) { 
     SpringApplication.run(PersonStreamApplication.class, args); 
    } 
} 

출처 :

@EnableBinding(PersonSource.Source.class) 
public class PersonSource { 

    @Bean 
    @InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")) 
    public MessageSource<Person> getDate() { 
     Map<String, Object> headers = new HashMap<>(); 
     headers.put("directory", "/dir"); 
     return() -> createMessage(new Person("[email protected]", 28), new MessageHeaders(headers)); 
    } 

    public interface Source { 

     String SAMPLE = "sample-source"; 

     @Output(SAMPLE) 
     MessageChannel sampleSource(); 
    } 
} 

프로세서 :

@RequiredArgsConstructor 
@EnableBinding(Processor.class) 
public class PersonProcessor { 

    private final PersonValidator validator; 

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
    public Either<String, Person> process(Person person) { 
     return this.validator.validate(person).toEither(); 
    } 
} 

싱크 :

@EnableBinding(PersonSink.Sink.class) 
public class PersonSink { 

    @Bean 
    public IntegrationFlow log() { 
     return IntegrationFlows.from(Sink.SAMPLE).log().get(); 
    } 

    public interface Sink { 

     String SAMPLE = "sample-sink"; 

     @Input(SAMPLE) 
     SubscribableChannel sampleSink(); 
    } 
} 

application.yml와

spring: 
    cloud: 
    stream: 
     bindings: 
     sample-source: 
      destination: testing.stream.input 
     input: 
      destination: testing.stream.input 
     output: 
      content-type: application/json 
      destination: testing.stream.output 
     sample-sink: 
      destination: testing.stream.output 

답변

1

귀하의 문제는 다음과 같습니다 https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling

구분 : 당신은 이득에게 Content-Type 변환을 얻을 수 @StreamListener를 사용할 필요가

@Bean 
public IntegrationFlow log() { 
    return IntegrationFlows.from(Sink.SAMPLE).log().get(); 
} 

@StreamListener과 스프링 사이 통합 @ServiceActivator은 문자열 페이로드와 contentType 헤더가인 인바운드 메시지를 고려할 때 나타납니다. @StreamListener의 경우 MessageConverter 메커니즘은 contentType 헤더를 사용하여 String 페이로드를 구문 분석하여 Vote 개체로 해석합니다.

+0

내 싱크에서'@ StreamListener'를 사용하도록 변경하면 데이터를 내 흐름에 전달할 수 있도록 MessageChannel을 주입해야합니까? 또 다른 문제는 헤더를 전파해야하고'@ StreamListener'가 헤더를 전파하지 않는다는 것입니다. –

+1

맞아,'IntegrationFlow'의 채널로 보내려면'@ StreamListener'와 함께'@ SendTo'를 사용해야합니다. 메소드 인자가'Message <>'이거나'@Headers Map headers' 인자가'@ Payload'와 함께 있으면 헤더를 전파합니다. OTOH 대신'IntegrationFlow'에서'.transform (Transformers.fromJson (Person.class))'를 사용하는 것을 고려할 수도 있습니다. –