처리 할 사람을 보내고 모든 오류를 수집하고 잭슨을 사용하여 직렬화하는 샘플 응용 프로그램을 만들려고합니다.프로세서와 싱크 사이의 메시지 변환
그러나 내 싱크는 프로세서에서 수신 한 메시지를 변환하려고 시도하지 않습니다. application/json 대신에 다른 contentType으로 메시지를받는 것처럼 보입니다. 이 기능을 사용하려면 어떻게해야합니까? 나는 봄 클라우드 Dalston.SR3
홈페이지를 사용하고
:
@SpringBootApplication
public class PersonStreamApplication {
@Bean
public MessageConverter customMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
return converter;
}
public static void main(String[] args) {
SpringApplication.run(PersonStreamApplication.class, args);
}
}
출처 :
@EnableBinding(PersonSource.Source.class)
public class PersonSource {
@Bean
@InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Person> getDate() {
Map<String, Object> headers = new HashMap<>();
headers.put("directory", "/dir");
return() -> createMessage(new Person("[email protected]", 28), new MessageHeaders(headers));
}
public interface Source {
String SAMPLE = "sample-source";
@Output(SAMPLE)
MessageChannel sampleSource();
}
}
프로세서 :
@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {
private final PersonValidator validator;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Either<String, Person> process(Person person) {
return this.validator.validate(person).toEither();
}
}
싱크 :
@EnableBinding(PersonSink.Sink.class)
public class PersonSink {
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
public interface Sink {
String SAMPLE = "sample-sink";
@Input(SAMPLE)
SubscribableChannel sampleSink();
}
}
application.yml와
spring:
cloud:
stream:
bindings:
sample-source:
destination: testing.stream.input
input:
destination: testing.stream.input
output:
content-type: application/json
destination: testing.stream.output
sample-sink:
destination: testing.stream.output
내 싱크에서'@ StreamListener'를 사용하도록 변경하면 데이터를 내 흐름에 전달할 수 있도록 MessageChannel을 주입해야합니까? 또 다른 문제는 헤더를 전파해야하고'@ StreamListener'가 헤더를 전파하지 않는다는 것입니다. –
맞아,'IntegrationFlow'의 채널로 보내려면'@ StreamListener'와 함께'@ SendTo'를 사용해야합니다. 메소드 인자가'Message <>'이거나'@Headers Map headers' 인자가'@ Payload'와 함께 있으면 헤더를 전파합니다. OTOH 대신'IntegrationFlow'에서'.transform (Transformers.fromJson (Person.class))'를 사용하는 것을 고려할 수도 있습니다. –