2016-08-04 3 views
1

아래 코드에서 stream1과 stream2는 모두 개별적으로 잘 실행되며 출력을 볼 수 있지만 조인 된 스트림은 아무 것도 기록하지 않습니다. 조인 창과 관련이 있다는 느낌이 들지만 양쪽 스트림의 데이터는 거의 정확히 같은 시간에 나타납니다.결합 된 카프카 스트림을 실행하거나 출력 할 수 없습니다.

val stream = builder.stream(stringSerde, byteArraySerde, "topic") 

val stream1 = stream 
    .filter((key, value) => somefilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic1") 

val stream2 = stream 
    .filter((key, value) => someotherfilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic2") 

val joinedStream = stream1 
    .join(stream2, (value1: Array[Byte], value2: Array[Byte]) => { 
    println("wont print anything") 
    return somerandomdata 
    }, 
    JoinWindows.of("othertopic").within(10000L), 
    stringSerde, byteArraySerde, byteArraySerde) 
+1

조인 윈도우가 포함 된 기록 타임 스탬프 (이상 계산 즉,에 포함되어있는 메타 데이터 키와 값에 추가로 각 레코드). 디버깅을 위해 타임 스탬프를 인쇄하면 도움이됩니다. 이들에 접근하려면 process() - 주어진'context' 객체를 사용할 필요가 있습니다. 현재 처리 된 레코드의 타임 스탬프를 포함합니다 (즉, 처리 된 각 레코드에 대해 컨텍스트가 업데이트됩니다). –

답변

관련 문제