2017-10-19 6 views
0

제공된 키 (호핑) 시간대에 T의 임계 값 N보다 자주 나타나는 키를 필터링하려고합니다. 예를 들어Kafka Streams - 시간 창에 자주 나타나는 메시지 필터링

다음 스트림 : 위에서 불가능한 경우

#time, key 
0, A 
1, B 
2, A 
3, C 
4, D 
5, A 
6, B 
7, C 
8, C 
9, D 
10, A 
11, D 
12, D 
13, D 
14, D 
15, D 

N=2T=3, 결과가 다르게

0, A 
2, A 
7, C 
8, C 
9, D 
11, D 
12, D 
13, D 
14, D 
15, D 

되어야, 단순화 만하는 것 임계 값 충족 후 메시지 필터링 :

#time, key 
2, A 
8, C 
11, D 
12, D 
13, D 
14, D 
15, D 

Kafka Streams에서 이것이 가능합니까?

지금까지 windowed count (인스턴스의 KTable)을 만들고 원본 스트림에 다시 연결하려고했습니다. windowed count의 키를 KTable#toStream((k,v) -> k.key())을 사용하고 dummy aggregation을 다시 수행하여 KTable의 인스턴스로 다시 원래 키로 변경합니다. 이것은 leftJoin에 임계 값을 초과 한 후 매우 가까이 오는 메시지를 놓치게하는 지연을 유발하는 것 같습니다.

wcount에 각각 UPSERT 때문에 중복 출력 결과
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

이벤트를 트리거 (더미 응집을 떠나는) KStream 적절한 창 가입 -

final Serde<String> stringSerde = Serdes.String(); 
    final Serde<Long> longSerde = Serdes.Long(); 

    KStream<String, Long> wcount = source.groupByKey() 
      .count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts") 
      .toStream((k,v) -> k.key()); 

    // perform dummy aggregation to get KTable 
    KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde) 
       .reduce((aggValue, newValue) -> newValue, 
       "dummy-aggregation-store"); 

    // left join and filter with threshold N=1 
    source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

는 I 또한 KStream 수행 시도 .

답변

1

. 목록의 모든 원시 데이터를 수집하는 윈도우 집계를 적용 할 수 있습니다 (즉, 수동으로 윈도우를 구체화). 그런 다음 창을 평가하는 flatMap을 적용합니다. 임계 값이 아직 충족되지 않으면 아무 것도 방출하지 않습니다. 임계 값이 처음 충족되면 모두 버퍼 된 데이터를 내 보냅니다. 임계 값보다 큰 수의 flatMap 이후의 모든 호출에 대해 목록에서 최신 값만 방출합니다 (이전에 flatMap에 대한 호출을 다른 모든 것으로 내 보낸 것, 즉 새로 추가 한 것만 방출 함).

참고 : KTable 캐시를 비활성화해야합니다. 즉, config 매개 변수 "cache.max.bytes.buffering"= 0을 설정해야합니다. 그렇지 않으면 알고리즘이 제대로 작동하지 않습니다. 이 같은

뭔가 :

KStream<Windowed<K>, List<V>> windows = stream.groupByKey() 
               .aggregate(
               /*init with empty list*/, 
               /*add value to list in agg*/, 
               TimeWindows.of()...), 
               ...) 
               .toStream(); 
KStream<K,V> thresholdMetStream = windows.flatMap(
              /* if List#size < threshold 
               then return empty-list, ie, nothing 
               elseif List#size == threshold 
               then return whole list 
               else [List#size > threshold] 
               then return last element from list 
              */); 
+1

감사합니다, 내가 원하는대로이 작업을 (거의) 얻었다. [텀블링 창] (https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing)은 새로운 시간 창의 시작 부분에있는 이벤트를 제외하고 작동합니다 (관련 이벤트가있는 경우). 선행하는 창).호핑 윈도우의 경우, 다중 윈도우가 단일 트리거링 이벤트 시퀀스를 포함 할 수 있으므로 출력 스트림에 중복이 있습니다. 그래서 저는 Processor API와 창 영속 저장소를 사용하여 솔루션을 구현하기로했습니다. –

0

AFAIK AFAIK는 Count-Min-Sketch 알고리즘에 가장 적합합니다. 예를 들어 스트림-lib 디렉토리 구현을 참조하십시오 : 이것은 확실히 가능하다

https://github.com/addthis/stream-lib

관련 문제