2017-09-26 1 views
3

flink 카프카 소비자와 함께 스트림 (kafka 메시지가 주제에 스트리밍 됨)이 있으며 흥미로운 행동을 느낍니다.Flink : 스트림이 끝날 때 창에서 데이터를 처리하지 않습니다.

데이터가 스트리밍 될 때, "완료"전에 데이터가 중지되거나 데이터가 끝나고 (몇 개의 창 후에) 창 끝까지 도달하지 않으면 나머지 파이프 라인은 방아쇠하지 마.

예 흐름 :

env.addSource(kafkaConsumer) 
     .flatMap(new TokenMapper()) 
     .keyBy("word") 
     .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 
     .reduce(new CountTokens()) 
     .flatMap(new ConvertToString()) 
     .addSink(producer); 

내가 TimeCharacteristic이 EVENTTIME로 설정 ENV으로 FlinkKafkaConsumer010을 사용하고 있습니다. 및 consumer.assignTimestampsAndWatermarks (새 PeriodicWatermarks()) 내 창을 10 초라고하고, 내 데이터 스트림은 (일정 기간 동안 스트리밍을 중지 한 후 등) 데이터를 8 초이 포함 된 경우

private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{ 

    private long currentMaxTimestamp; 
    private final long maxOutOfOrderness; 

    public PeriodicWatermarksAuto(long maxOutOfOrderness){ 
     this.maxOutOfOrderness = maxOutOfOrderness; 
    } 

    @Override 
    public Watermark getCurrentWatermark() { 
     return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
    } 

    @Override 
    public long extractTimestamp(String t, long l) { 
     // this should be the event timestamp 
     currentMaxTimestamp = l; 
     logger.info("TIMESTAMP: " + l); 
     return l; 
    } 
} 

의 flatMap- 새로운 나중에 데이터에 스트리밍 될 때까지> 싱크 처리하지

예 데이터 스트림 처리 문제 :. 예를 들어, I (35)를 가지고 마찬가지로 경우

 xxxxxxxx(8secs)------(gap)--(later more data)xxxxx 
     ^(not processed)   (until I get here)^ 

(X는 각각 제 당 데이터의 조각이다) 초당 스트리밍 가치 ata (그리고 다시 내 창은 10 초) 데이터 트리거 만 3 창, 나머지 5 초 분량의 데이터는 처리되지 않습니다.

 ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx 
     (processed)  ^(not processed)   (until I get here)^ 

내 창은 10 초입니다 내가 스트리밍 데이터를 5 초 flatmap- 만이 마지막 경우> 일이 결코 가라.

제 질문은 입니다. 잠시 후 데이터가 표시되지 않으면 처리 할 창 데이터를 트리거하는 방법은 무엇입니까?

내 데이터가 실시간 스트리밍되는 경우 아무런 데이터가없는 것을 볼 수 있으며 마지막 창 (단지 5 초 분량의 데이터 만 표시)이 원하지 않을 수도 있습니다. 새로운 데이터가 들어올 때까지 시간, 창 시간이 지난 후에 그 마지막 창에 대한 결과를 원할 것입니다.

큰 소리로 생각해 보면 이것은 ProcessingTime 대신 EventTime을 사용했기 때문인 것으로 보이거나 실제로 워터 마크를 마지막 창에 표시하기 위해 워터 마크가 제대로 생성되지 않고있는 것 같습니다. 스트림이 마지막 비트가 끝나지 않으면이 문제는 누구에게나 문제가 될 것이라고 생각합니다. 나는 아마도 end-of-stream msg를 보낼 수 있다고 말하고 싶지만 소스가 스트림을 깨기 때문에 스팀이 끝나면 도움이되지 않습니다.

편집 : 그래서 처리 시간이 바뀌고 마지막 창에서 데이터를 올바르게 처리하므로 EventTime이 원인을 추측합니다. 맞춤 트리거 또는 적절한 창 워터 마크가 대답 일 수 있다고 생각합니다. .

도움을 주셔서 감사합니다!

답변

2

내가 생각하기에 워터 마크와 관련하여이 문제가 후계자에게 남겨 두겠습니다. 타임 스탬프 및 워터 메이커 (assignTimestampsAndWatermarks에서)는 'getCurrentWatermark()'를 호출하고 워터 마크를 고정 엔터티 (타임 스탬프 - 최대 오프셋)로 설정하여 워터 마크가 새 엔티티를 확인할 때까지 업데이트하지 않습니다.

내 솔루션은 구성 가능한 시간 내에 데이터가 표시되지 않으면 결국 워터 마크를 다음 창으로 이동시키는 일종의 타이머입니다. 매우 잠복 한 데이터를 처리 할 수는 없지만 이것이 문제가 될 것이라고는 생각하지 않습니다. 이는 EventTime 처리의 의도 된 동작입니다.

+0

시간을 처리하기 위해 사용자 정의 트리거를 창에 추가하는 것이 좋습니다. eventTime 창은 이와 같은 이유로 맞춤 처리 시간 트리거와 함께 사용되는 경우가 많습니다. – Jicaar

+0

그게 내가 생각하고 있었던거야. 내가 해결 방법을 가지고 작업하게 만들었지 만 사용자 정의 트리거가 가장 좋을 것이라는 것에 동의합니다. –

관련 문제