세션의 PCollection을 가져오고 채널/연결 당 평균 세션 지속 시간을 얻으려고합니다. 나는 창문마다 초기 트리거가 발사되는 곳에서 뭔가를하고 있습니다. 60 분짜리 창문이 매 1 분마다 움직이면 초기 트리거가 60 번 발사됩니다. 출력물의 타임 스탬프를 보면 앞으로 60 분 동안 매분마다 창이 열립니다. 방아쇠를 가장 최근의 창에 대해 한 번 발사하고 싶습니다. 매 10 초마다 지난 60 분간 평균 세션 지속 시간을 갖습니다.Apache Beam - 여러 창을 출력하는 Windows 슬라이딩
이전에 슬라이딩 윈도우를 사용했고 예상 한 결과를 얻었습니다. 슬라이딩 및 세션 창을 혼합하여 어떻게 든이 문제를 일으키고 있습니다.
내가 당신에게 내 파이프 라인의 그림을 그릴 보자
첫째, 활성 사용자를 기반으로 세션을 만드는거야 :
.apply("Add Window Sessions",
Window.<KV<String, String>> into(Sessions.withGapDuration(Duration.standardMinutes(60)))
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply("Group Sessions", Latest.perKey())
단계 후이 세션 객체 컴퓨팅 세션 기간을 생성 등 이것은 PCollection (Session)으로 끝납니다.
Pcollection (세션)에서 KV 연결 기간을 만듭니다.
그런 다음 슬라이딩 창을 적용한 다음 평균을 적용합니다.
.apply("Apply Rolling Minute Window",
Window. < KV < String, Integer >> into(
SlidingWindows
.of(Duration.standardMinutes(60))
.every(Duration.standardMinutes(1)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10)))
)
)
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes()
)
.apply("Get Average", Mean.perKey())
문제를보고있는 시점입니다. 내가보고 싶은데 평균 지속 시간을 가진 키 당 하나의 출력입니다. 내가 실제로보고있는 것은 다음 60 분 동안 매분마다 동일한 키에 대한 60 개의 출력입니다.
EARLY 2017-12-17T20:41:59.999Z
EARLY 2017-12-17T20:43:59.999Z
EARLY 2017-12-17T20:56:59.999Z
(cont)
로그가 12 월에 인쇄 된 : 나는 60 분 미래에이 출력을 타임 스탬프 60 시간을 얻을
LOG.info(c.pane().getTiming() + " " + c.timestamp());
: C가 ProcessContext되고있는 DoFn에서이 로그와
17, 2017 19:35:19. 출력 수는 항상 창 크기/슬라이드 지속 시간입니다. 따라서 5 분마다 60 분짜리 창을 열면 12 개의 출력이 나옵니다.