2017-12-17 3 views
0

세션의 PCollection을 가져오고 채널/연결 당 평균 세션 지속 시간을 얻으려고합니다. 나는 창문마다 초기 트리거가 발사되는 곳에서 뭔가를하고 있습니다. 60 분짜리 창문이 매 1 분마다 움직이면 초기 트리거가 60 번 발사됩니다. 출력물의 타임 스탬프를 보면 앞으로 60 분 동안 매분마다 창이 열립니다. 방아쇠를 가장 최근의 창에 대해 한 번 발사하고 싶습니다. 매 10 초마다 지난 60 분간 평균 세션 지속 시간을 갖습니다.Apache Beam - 여러 창을 출력하는 Windows 슬라이딩

이전에 슬라이딩 윈도우를 사용했고 예상 한 결과를 얻었습니다. 슬라이딩 및 세션 창을 혼합하여 어떻게 든이 문제를 일으키고 있습니다.

내가 당신에게 내 파이프 라인의 그림을 그릴 보자

첫째, 활성 사용자를 기반으로 세션을 만드는거야 :

.apply("Add Window Sessions", 
Window.<KV<String, String>> into(Sessions.withGapDuration(Duration.standardMinutes(60))) 
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) 
    .triggering(
    AfterWatermark.pastEndOfWindow() 
     .withEarlyFirings(AfterProcessingTime 
     .pastFirstElementInPane() 
     .plusDelayOf(Duration.standardSeconds(10)))) 
    .withAllowedLateness(Duration.ZERO) 
    .discardingFiredPanes() 
) 
.apply("Group Sessions", Latest.perKey()) 

단계 후이 세션 객체 컴퓨팅 세션 기간을 생성 등 이것은 PCollection (Session)으로 끝납니다.

Pcollection (세션)에서 KV 연결 기간을 만듭니다.

그런 다음 슬라이딩 창을 적용한 다음 평균을 적용합니다.

.apply("Apply Rolling Minute Window", 
     Window. < KV < String, Integer >> into(
     SlidingWindows 
     .of(Duration.standardMinutes(60)) 
     .every(Duration.standardMinutes(1))) 
     .triggering(
     Repeatedly.forever(
     AfterWatermark.pastEndOfWindow() 
     .withEarlyFirings(AfterProcessingTime 
     .pastFirstElementInPane() 
     .plusDelayOf(Duration.standardSeconds(10))) 
     ) 
    ) 
     .withAllowedLateness(Duration.standardMinutes(1)) 
     .discardingFiredPanes() 
    ) 
    .apply("Get Average", Mean.perKey()) 

문제를보고있는 시점입니다. 내가보고 싶은데 평균 지속 시간을 가진 키 당 하나의 출력입니다. 내가 실제로보고있는 것은 다음 60 분 동안 매분마다 동일한 키에 대한 60 개의 출력입니다.

EARLY 2017-12-17T20:41:59.999Z 
EARLY 2017-12-17T20:43:59.999Z 
EARLY 2017-12-17T20:56:59.999Z 
(cont) 

로그가 12 월에 인쇄 된 : 나는 60 분 미래에이 출력을 타임 스탬프 60 시간을 얻을

LOG.info(c.pane().getTiming() + " " + c.timestamp()); 

: C가 ProcessContext되고있는 DoFn에서이 로그와

17, 2017 19:35:19. 출력 수는 항상 창 크기/슬라이드 지속 시간입니다. 따라서 5 분마다 60 분짜리 창을 열면 12 개의 출력이 나옵니다.

답변

0

나는 이것을 이해했다고 생각합니다.

슬라이딩 창은 .every() 함수를 사용하여 새 창을 만듭니다. 조기 발화 설정은 각 창에 적용되므로 여러 번 발사하는 것이 좋습니다.

"현재 창"만 출력하려면 결과를 출력하고 .every()를 조정하여 주파수를 제어하기 전에 c.pane(). isFirst() == true를 검사합니다. .

관련 문제