1
카프카 주제를 읽고이를 기반으로 일부 처리를 수행하고 다른 주제에 결과를 저장하려고합니다.Kafka 스트림을 사용하여 Seq 추출
내 코드는 다음과 같습니다
builder
.stream(settings.Streams.inputTopic)
.mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e))
// Something needs to be done here...
.to(settings.Streams.outputTopic)
fx(e)
기능은 일부 처리를 수행하고 Seq[Product]
를 반환합니다. 모든 제품을 주제에 별도의 항목으로 저장하고 싶습니다. 문제는 주제에서 읽은 메시지가 여러 제품을 포함하므로 fx(e)
의 반환 값입니다.
스트림에이 비헤이비어를 포함 할 수 있습니까?
. 감사! –