2017-04-12 4 views
1

저는 카프카 소비자 및 프로듀서 API를 사용하여 잠시 동안 작업 해 왔으며 스트림 API에서 손을 사용하고 싶습니다. 나는 수많은 참고 문헌을 온라인에서 살펴 봤지만이 간단한 것을 이해할 수는 없다.카프카 출력 스트림

어떻게 출력 항목으로 만 메시지를 보내는 KStream을 만드십니까?

github repo에있는 가장 기본적인 예제를 예로 들어 보겠습니다. https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java 하나의 대기열에서 메시지를 가져 와서 조작 한 후에 다른 대기열에 메시지를 게시합니다. 이 같은

뭔가 :

final KStreamBuilder builder = new KStreamBuilder(); 
final KStream<String, String> textLines = builder.stream(); 
// do the dirty work... 
textLines.to("outputTopic") 

그러나 builder.stream();가 존재하지 않습니다 그것은 매우 적어도 입력 항목 이름에 필요합니다.

나는 이것을 위해 일반 카프카 프로듀서에 붙여야할까요? 그렇다면 나는이 말을 명시 적으로 말하는 행운을 찾지 못했다.

+0

"더러운 일"부분에서 무엇을하고 싶은지 궁금합니다. 귀하의 유스 케이스 시나리오를 이해하고 싶습니다. –

+0

나는 외부 소스 형태로 데이터를 보내고있다. 일부는 필터링을 수행 한 후 특정 주제로 보내야합니다. 내 생각은 kstream을 활용하는 것이 었습니다. 앞으로 메시지가 kafka를 통해 들어올 수 있으므로 입력 주제를 쉽게 리펙토링 할 수 있기 때문입니다. 이제는 공급자를 만들고 나중에 옮겨야하는 대신. – Rhed

+0

Kafka Connect를 사용하면 필터링을 포함한 "단일 메시지 변환"을 수행 할 수 있습니다. http://kafka.apache.org/documentation/#connect_transforms –

답변

2

Kafka Streams API는 주제를 입력 스트림으로 사용하고, 레코드를 처리하고, 결과를 주제로 다시 쓰도록 설계되었습니다. 카프카에 데이터를 쓰는 것이 아닙니다.

그래, 주제에 데이터를 쓰려면 KafkaProducer을 사용해야합니다.

+0

특별한 이유가 있습니까? 근본적인 무언가가 있는지 궁금해서 api에 대해 빠져 있습니다. 나는 많이 생각했다. – Rhed

+0

Streams는 Kafka에 저장된 데이터를 처리하도록 설계되었습니다. 그러나 Kafka Connect를 사용하면 필터링을 포함한 "단일 메시지 변환"을 수행 할 수 있습니다 : kafka.apache.org/documentation/#connect_transforms –

관련 문제