flink 카프카 소비자와 함께 스트림 (kafka 메시지가 주제에 스트리밍 됨)이 있으며 흥미로운 행동을 느낍니다.Flink : 스트림이 끝날 때 창에서 데이터를 처리하지 않습니다.
데이터가 스트리밍 될 때, "완료"전에 데이터가 중지되거나 데이터가 끝나고 (몇 개의 창 후에) 창 끝까지 도달하지 않으면 나머지 파이프 라인은 방아쇠하지 마.
예 흐름 :
env.addSource(kafkaConsumer)
.flatMap(new TokenMapper())
.keyBy("word")
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.reduce(new CountTokens())
.flatMap(new ConvertToString())
.addSink(producer);
내가 TimeCharacteristic이 EVENTTIME로 설정 ENV으로 FlinkKafkaConsumer010을 사용하고 있습니다. 및 consumer.assignTimestampsAndWatermarks (새 PeriodicWatermarks()) 내 창을 10 초라고하고, 내 데이터 스트림은 (일정 기간 동안 스트리밍을 중지 한 후 등) 데이터를 8 초이 포함 된 경우
private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{
private long currentMaxTimestamp;
private final long maxOutOfOrderness;
public PeriodicWatermarksAuto(long maxOutOfOrderness){
this.maxOutOfOrderness = maxOutOfOrderness;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(String t, long l) {
// this should be the event timestamp
currentMaxTimestamp = l;
logger.info("TIMESTAMP: " + l);
return l;
}
}
의 flatMap- 새로운 나중에 데이터에 스트리밍 될 때까지> 싱크 처리하지
예 데이터 스트림 처리 문제 :. 예를 들어, I (35)를 가지고 마찬가지로 경우
xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
^(not processed) (until I get here)^
(X는 각각 제 당 데이터의 조각이다) 초당 스트리밍 가치 ata (그리고 다시 내 창은 10 초) 데이터 트리거 만 3 창, 나머지 5 초 분량의 데이터는 처리되지 않습니다.
...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
(processed) ^(not processed) (until I get here)^
내 창은 10 초입니다 내가 스트리밍 데이터를 5 초 flatmap- 만이 마지막 경우> 일이 결코 가라.
제 질문은 입니다. 잠시 후 데이터가 표시되지 않으면 처리 할 창 데이터를 트리거하는 방법은 무엇입니까?
내 데이터가 실시간 스트리밍되는 경우 아무런 데이터가없는 것을 볼 수 있으며 마지막 창 (단지 5 초 분량의 데이터 만 표시)이 원하지 않을 수도 있습니다. 새로운 데이터가 들어올 때까지 시간, 창 시간이 지난 후에 그 마지막 창에 대한 결과를 원할 것입니다.
큰 소리로 생각해 보면 이것은 ProcessingTime 대신 EventTime을 사용했기 때문인 것으로 보이거나 실제로 워터 마크를 마지막 창에 표시하기 위해 워터 마크가 제대로 생성되지 않고있는 것 같습니다. 스트림이 마지막 비트가 끝나지 않으면이 문제는 누구에게나 문제가 될 것이라고 생각합니다. 나는 아마도 end-of-stream msg를 보낼 수 있다고 말하고 싶지만 소스가 스트림을 깨기 때문에 스팀이 끝나면 도움이되지 않습니다.
편집 : 그래서 처리 시간이 바뀌고 마지막 창에서 데이터를 올바르게 처리하므로 EventTime이 원인을 추측합니다. 맞춤 트리거 또는 적절한 창 워터 마크가 대답 일 수 있다고 생각합니다. .
도움을 주셔서 감사합니다!
시간을 처리하기 위해 사용자 정의 트리거를 창에 추가하는 것이 좋습니다. eventTime 창은 이와 같은 이유로 맞춤 처리 시간 트리거와 함께 사용되는 경우가 많습니다. – Jicaar
그게 내가 생각하고 있었던거야. 내가 해결 방법을 가지고 작업하게 만들었지 만 사용자 정의 트리거가 가장 좋을 것이라는 것에 동의합니다. –