2017-11-15 4 views
1

카프카 주제를 읽고이를 기반으로 일부 처리를 수행하고 다른 주제에 결과를 저장하려고합니다.Kafka 스트림을 사용하여 Seq 추출

내 코드는 다음과 같습니다

builder 
    .stream(settings.Streams.inputTopic) 
    .mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e)) 
    // Something needs to be done here... 
    .to(settings.Streams.outputTopic) 

fx(e) 기능은 일부 처리를 수행하고 Seq[Product]를 반환합니다. 모든 제품을 주제에 별도의 항목으로 저장하고 싶습니다. 문제는 주제에서 읽은 메시지가 여러 제품을 포함하므로 fx(e)의 반환 값입니다.

스트림에이 비헤이비어를 포함 할 수 있습니까?

답변

3

대신 사용 mapValuesflatMapValues : 속임수를 썼는지

import scala.collection.JavaConverters.asJavaIterableConverter 

builder 
    .stream(settings.Streams.inputTopic) 
    .flatMapValues(e => fx(e).toIterable.asJava) 
    .to(settings.Streams.outputTopic) 
+0

. 감사! –

관련 문제