카프카 스트림 단일 클러스터 만 구축이다 -
val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic",
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
당신에게
대단히 감사합니다.
해결 방법은 foreach()
또는 유사하게 사용하고 대상 클러스터에 쓰는 자신의 KafkaProducer
을 인스턴스화하는 것입니다. 참고, 자신의 프로듀서는 이어야합니다 동기화 쓰기 사용! 그렇지 않으면 실패 할 경우 데이터가 손실 될 수 있습니다. 따라서, 그것은 매우 performant 솔루션 아니에요.
결과를 원본 클러스터에 쓴 다음 데이터를 대상 클러스터에 복제하는 것이 좋습니다. 실제 데이터는 대상 클러스터에 더 긴 보유 시간과 함께 저장되기 때문에 소스 클러스터에서 출력 항목의 보관 기간을 훨씬 단축 할 수 있습니다. 이렇게하면 원본 클러스터에서 필요한 저장소를 제한 할 수 있습니다.
대단히 감사합니다. Matthias, 알아두면 좋습니다! –
메시지를 동 기적으로 검색하는 가장 좋은 방법은 무엇입니까? 각 메시지에서'.get()'메서드를 호출하는 것은 약간 "해킹"되는 것처럼 보입니다. 속성이 있습니까? –
'get()'을 사용하는 것이 정확합니다. –