2017-12-15 4 views
3

현재 한 카프카 클러스터의 토픽에서 다른 토픽으로 메시지를 쉽게 스트리밍하려고합니다 (원격 -> 로컬 클러스터).
아이디어는 Kafka-Streams를 즉시 사용하여 로컬 클러스터의 실제 메시지를 복제 할 필요가 없지만 Kafka-Streams 처리의 "결과"만 Kafka-Topics에 가져올 수 있습니다.한 카프카 클러스터에서 다른 카프카 클러스터로 메시지 스트리밍

WordCount 데모가 다른 PC의 Kafka-Instance에 있다고 가정 해 봅시다. 로컬 컴퓨터에서도 Kafka-Instance가 실행 중입니다.
이제 단어 수를 계산해야하는 문장이 포함 된 주제 ("원격")에서 WordCount 데모를 실행하고 싶습니다.
그러나 "원격"주제 대신 로컬 시스템의 주제에 기록해야합니다.

Kafka-Streams API에서 이와 같은 기능을 수행 할 수 있습니까?
예. 팀

답변

4

카프카 스트림 단일 클러스터 만 구축이다 -

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig) 
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig) 
val wordCounts: KTable[String, Long] = textLines 
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) 
    .groupBy((_, word) => word) 
    .count("word-counts") 

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig) 

val streams: KafkaStreams = new KafkaStreams(builder) 
streams.start() 

당신에게
대단히 감사합니다.

해결 방법은 foreach() 또는 유사하게 사용하고 대상 클러스터에 쓰는 자신의 KafkaProducer을 인스턴스화하는 것입니다. 참고, 자신의 프로듀서는 이어야합니다 동기화 쓰기 사용! 그렇지 않으면 실패 할 경우 데이터가 손실 될 수 있습니다. 따라서, 그것은 매우 performant 솔루션 아니에요.

결과를 원본 클러스터에 쓴 다음 데이터를 대상 클러스터에 복제하는 것이 좋습니다. 실제 데이터는 대상 클러스터에 더 긴 보유 시간과 함께 저장되기 때문에 소스 클러스터에서 출력 항목의 보관 기간을 훨씬 단축 할 수 있습니다. 이렇게하면 원본 클러스터에서 필요한 저장소를 제한 할 수 있습니다.

+0

대단히 감사합니다. Matthias, 알아두면 좋습니다! –

+0

메시지를 동 기적으로 검색하는 가장 좋은 방법은 무엇입니까? 각 메시지에서'.get()'메서드를 호출하는 것은 약간 "해킹"되는 것처럼 보입니다. 속성이 있습니까? –

+1

'get()'을 사용하는 것이 정확합니다. –

3

위의 답변에서 Matthias가 언급 한 Replicator을 확인하십시오. 이것은 당신의 묘사가 멋지게 맞는 것입니다.

+0

완벽! 대단히 감사합니다. –

관련 문제