2016-09-05 3 views
5

저는 작업중인 Java 응용 프로그램에 대한 Kafka 설명서를 많이 검토했습니다. 나는 Java 8에서 소개 된 lambda 문법에 익숙해 지려고 노력했지만, 그 배경에 대해서는 약간 개략적인데, 내가 아직 사용하고있는 것임을 확신하지 못한다.카프카 스트림 인쇄 콘솔에 입력 하시겠습니까?

나는 Kafka/Zookeeper Service를 아무런 문제없이 운영해 왔으며, 내가하고 싶은 것은 작은 예제 프로그램을 작성하여 입력을 기반으로 작성하지만, 많은 예제가 있기 때문에 단어 수를 계산하지 않는다. 이미.

예 데이터

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC. 

질문

나는 3 개 편지 키워드를 추출하여 인쇄 할 수 있도록하려면 :

는 샘플 데이터에 관해서는 나는 다음과 같은 구조의 문자열을 받고있을 것입니다 System.out.println입니다. 입력을 포함하는 문자열 변수를 얻으려면 어떻게해야합니까? 정규 표현식을 적용하는 방법이나 키워드를 얻기 위해 문자열을 검색하는 방법도 알고 있습니다.

코드

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092"); 
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181"); 
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

    final Serde<String> stringSerde = Serdes.String(); 

    KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream"); 

    KafkaStreams streams = new KafkaStreams(builder, props); 
    streams.start(); 

    //How do I assign the input from in-stream to the following variable? 
    String variable = ? 
} 

그래서 나는 기본적으로 같은 String는 인스턴스 (생산자, 소비자 및 스트림의 모든에 나타납니다보고 싶어 사육사, 카프카, 생산자와 소비자가 모두 동일한 주제에 매여 실행을). 당신은 카프카 스트림을 사용하는 경우

답변

11

, 당신은 당신의 데이터 스트림에 기능/연산자를 적용해야합니다. 귀하의 경우에는, 당신은 따라서 당신이 source에 연산자를 적용 할하는 KStream 개체를 만듭니다.

원하는 작업에 따라 스트림의 각 레코드에 독립적으로 함수를 적용하는 연산자 (예 : map()) 또는 여러 레코드에 함께 기능을 적용하는 다른 연산자 (예 : aggregateByKey())가 있습니다. http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl 및 예 및 https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streams

위의 예에서와 같이 Kafka Streams를 사용하여 로컬 변수를 만들지 않고 함께 연결되는 연산자/함수에 모든 것을 포함 시키십시오. 당신은 표준 출력에 대한 모든 입력 레코드를 인쇄 할 경우 streams.start()를 통해 응용 프로그램을 시작한 후

예를 들어, 당신은 것입니다 소비자 당신의 기록 입력 항목 각각에 대해, 따라서

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream"); 
source.foreach(new ForeachAction<String, String>() { 
    void apply(String key, String value) { 
     System.out.println(key + ": " + value); 
    } 
}); 

을 할 수 레코드를 입력하면 apply(...)에 대한 호출이 수행되고 stdout에 레코드가 인쇄됩니다.

물론, 콘솔에 스트림을 인쇄하기위한 더 기본 방법은 source.print()을 사용하는 것입니다 (내부적으로 기본적으로 이미 주어진 ForeachAction와 같이 foreach() 연산자와 동일하다.)와 예를 들어

문자열을 로컬 변수에 할당하면 apply(...)에 코드를 입력하고 정규식 등을 사용하여 "3 개의 문자 키워드를 추출"해야합니다.

그러나이를 표현하는 가장 좋은 방법은 flatMapValues()과 (즉, source.flatMapValues(...).print())의 조합을 통해 이루어집니다.flatMapValues()이 각 입력 레코드에 대해 호출됩니다 (귀하의 경우 키가 null이므로 무시할 수 있다고 가정합니다). flatMapValue 함수 내에서 정규식을 적용하고 각 일치 항목에 대해 마지막으로 반환하는 값 목록에 일치 항목을 추가합니다.

source.flatMapValues(new ValueMapper<String, Iterable<String>>() { 
    @Override 
    public Iterable<String> apply(String value) { 
     ArrayList<String> keywords = new ArrayList<String>(); 

     // apply regex to value and for each match add it to keywords 

     return keywords; 
    } 
} 

flatMapValues의 출력은 각각의 키워드에 (즉, 출력 스트림이 모든 목록 ValueMapper#apply()에서 수익을 통해 "노동 조합"입니다)에 대한 기록을 포함, 다시 KStream 될 것입니다. 마지막으로 결과를 print()을 통해 콘솔에 출력합니다. (물론 flatMapValue + print 대신 foreach을 사용할 수도 있지만 모듈화는 적습니다.)

+0

와우. 훌륭한 대답의 동료. 이것은 내가 찾고 있었던 것이다! – Zeliax

+0

YW. 자유롭게 forevote : –

+1

')'foreach 루프 끝 부분에 없습니다. – asitm9