우리는 PubSub에서 읽고 일부 필드를 추출하여 bigtable에 쓰는 데이터 흐름 스트리밍 작업이 있습니다. 우리는 자동 축소 기능을 사용할 때 데이터 흐름의 처리량이 떨어지는 것을 관찰하고 있습니다. 예를 들어, 데이터 흐름 작업이 현재 2 명의 작업자로 실행 중이고 100 개의 메시지/초의 비율로 처리중인 경우 자동 증가 중에 100 개의 메시지/초의 비율이 떨어지며 때로는 거의 0으로 떨어지고 500 개의 메시지로 증가합니다 /비서. 우리는 매회 데이터 흐름을 상향 조정하고 있습니다. 이로 인해 autoscaling 중에 시스템 지연이 증가하고 pub/sub에서 확인되지 않은 메시지가 급증합니다.자동 크기 조절 중에 Google 데이터 흐름 처리량이 감소합니다.
이것은 데이터 흐름 자동 확장의 예상되는 동작입니까, 아니면 승인되지 않은 메시지의 향신료가 자동으로 줄어들고 최소화되는 동안이 100 개의 메시지를 유지할 수 있습니까? (제발 참고 : 100 개 개의 메시지/초 500 개 메시지/초는 예를 들어 수치입니다)
작업 ID : 2017-10-23_12_29_09-11538967430775949506
내가 술집/하위 스택 드라이버 및 데이터 흐름의 스크린 샷을 첨부하고 자동 크기 조절.
풀 수의 감소 매번 데이터 흐름 autoscales을 요청이 있습니다 . 타임 스탬프가있는 스크린 샷을 찍을 수는 없지만 풀 요청은 시간 데이터 흐름 자동 확장과 일치합니다. =========== 편집 : ======================
우리는 아래의 방법을 사용하여 GCS에 병렬로 글을 쓰고 있습니다. 언급 된 윈도 윙.
inputCollection.apply("Windowing",
Window.<String>into(FixedWindows.of(ONE_MINUTE))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(ONE_MINUTE))
.withAllowedLateness(ONE_HOUR)
.discardingFiredPanes()
)
//Writing to GCS
.apply(TextIO.write()
.withWindowedWrites()
.withNumShards(10)
.to(options.getOutputPath())
.withFilenamePolicy(
new
WindowedFileNames(options.getOutputPath())));
WindowedFileNames.java
public class WindowedFileNames extends FilenamePolicy implements OrangeStreamConstants{
/**
*
*/
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(WindowedFileNames.class);
protected final String outputPath;
public WindowedFileNames(String outputPath) {
this.outputPath = outputPath;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow intervalWindow = (IntervalWindow) context.getWindow();
DateTime date = intervalWindow.maxTimestamp().toDateTime(DateTimeZone.forID("America/New_York"));
String fileName = String.format(FOLDER_EXPR, outputPath, //"orangestreaming",
DAY_FORMAT.print(date), HOUR_MIN_FORMAT.print(date) + HYPHEN + context.getShardNumber());
logger.error(fileName+"::::: File name for the current minute");
return outputDirectory
.getCurrentDirectory()
.resolve(fileName, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
return null;
}
}
이 부분을 보시고 – Pablo