제공된 키 (호핑) 시간대에 T
의 임계 값 N
보다 자주 나타나는 키를 필터링하려고합니다. 예를 들어Kafka Streams - 시간 창에 자주 나타나는 메시지 필터링
다음 스트림 : 위에서 불가능한 경우
#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D
및 N=2
및 T=3
, 결과가 다르게
0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D
되어야, 단순화 만하는 것 임계 값 충족 후 메시지 필터링 :
#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D
Kafka Streams에서 이것이 가능합니까?
지금까지 windowed count
(인스턴스의 KTable
)을 만들고 원본 스트림에 다시 연결하려고했습니다. windowed count
의 키를 KTable#toStream((k,v) -> k.key())
을 사용하고 dummy aggregation을 다시 수행하여 KTable
의 인스턴스로 다시 원래 키로 변경합니다. 이것은 leftJoin
에 임계 값을 초과 한 후 매우 가까이 오는 메시지를 놓치게하는 지연을 유발하는 것 같습니다.
wcount
에 각각
UPSERT
때문에 중복 출력 결과
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
이벤트를 트리거 (더미 응집을 떠나는) KStream
적절한 창 가입 -
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> wcount = source.groupByKey()
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
.toStream((k,v) -> k.key());
// perform dummy aggregation to get KTable
KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
.reduce((aggValue, newValue) -> newValue,
"dummy-aggregation-store");
// left join and filter with threshold N=1
source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
는 I 또한
KStream
수행 시도 .
감사합니다, 내가 원하는대로이 작업을 (거의) 얻었다. [텀블링 창] (https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing)은 새로운 시간 창의 시작 부분에있는 이벤트를 제외하고 작동합니다 (관련 이벤트가있는 경우). 선행하는 창).호핑 윈도우의 경우, 다중 윈도우가 단일 트리거링 이벤트 시퀀스를 포함 할 수 있으므로 출력 스트림에 중복이 있습니다. 그래서 저는 Processor API와 창 영속 저장소를 사용하여 솔루션을 구현하기로했습니다. –