2017-11-01 1 views
0

우리는 PubSub에서 읽고 일부 필드를 추출하여 bigtable에 쓰는 데이터 흐름 스트리밍 작업이 있습니다. 우리는 자동 축소 기능을 사용할 때 데이터 흐름의 처리량이 떨어지는 것을 관찰하고 있습니다. 예를 들어, 데이터 흐름 작업이 현재 2 명의 작업자로 실행 중이고 100 개의 메시지/초의 비율로 처리중인 경우 자동 증가 중에 100 개의 메시지/초의 비율이 떨어지며 때로는 거의 0으로 떨어지고 500 개의 메시지로 증가합니다 /비서. 우리는 매회 데이터 흐름을 상향 조정하고 있습니다. 이로 인해 autoscaling 중에 시스템 지연이 증가하고 pub/sub에서 확인되지 않은 메시지가 급증합니다.자동 크기 조절 중에 Google 데이터 흐름 처리량이 감소합니다.

이것은 데이터 흐름 자동 확장의 예상되는 동작입니까, 아니면 승인되지 않은 메시지의 향신료가 자동으로 줄어들고 최소화되는 동안이 100 개의 메시지를 유지할 수 있습니까? (제발 참고 : 100 개 개의 메시지/초 500 개 메시지/초는 예를 들어 수치입니다)

작업 ID : 2017-10-23_12_29_09-11538967430775949506

내가 술집/하위 스택 드라이버 및 데이터 흐름의 스크린 샷을 첨부하고 자동 크기 조절.

enter image description here

enter image description here

풀 수의 감소 매번 데이터 흐름 autoscales을 요청이 있습니다 enter image description here. 타임 스탬프가있는 스크린 샷을 찍을 수는 없지만 풀 요청은 시간 데이터 흐름 자동 확장과 일치합니다. =========== 편집 : ======================

우리는 아래의 방법을 사용하여 GCS에 병렬로 글을 쓰고 있습니다. 언급 된 윈도 윙.

inputCollection.apply("Windowing", 
      Window.<String>into(FixedWindows.of(ONE_MINUTE)) 

.triggering(AfterProcessingTime.pastFirstElementInPane() 
           .plusDelayOf(ONE_MINUTE)) 
        .withAllowedLateness(ONE_HOUR) 
        .discardingFiredPanes() 
       ) 

    //Writing to GCS 
       .apply(TextIO.write() 
          .withWindowedWrites() 
          .withNumShards(10) 
          .to(options.getOutputPath()) 
          .withFilenamePolicy(
            new 
WindowedFileNames(options.getOutputPath()))); 

WindowedFileNames.java

public class WindowedFileNames extends FilenamePolicy implements OrangeStreamConstants{ 

/** 
* 
*/ 
private static final long serialVersionUID = 1L; 
private static Logger logger = LoggerFactory.getLogger(WindowedFileNames.class); 
protected final String outputPath; 
public WindowedFileNames(String outputPath) { 
    this.outputPath = outputPath; 
} 


@Override 
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { 
    IntervalWindow intervalWindow = (IntervalWindow) context.getWindow(); 
    DateTime date = intervalWindow.maxTimestamp().toDateTime(DateTimeZone.forID("America/New_York")); 

    String fileName = String.format(FOLDER_EXPR, outputPath, //"orangestreaming", 
      DAY_FORMAT.print(date), HOUR_MIN_FORMAT.print(date) + HYPHEN + context.getShardNumber()); 
    logger.error(fileName+"::::: File name for the current minute"); 
    return outputDirectory 
       .getCurrentDirectory() 
       .resolve(fileName, StandardResolveOptions.RESOLVE_FILE); 
} 

@Override 
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) { 
    return null; 
} 





} 
+0

이 부분을 보시고 – Pablo

답변

0

무엇 실제로 일어나는 것은 처리량이 먼저 감소하고 있다는 것입니다, 그 노동자가 확장되는 이유입니다. 당신 1:30 오전 주위에 당신의 파이프 라인을 보면

는 일련의 사건은 그렇게 같다 :

  1. 오전 1시 23분 주변, 처리량이 떨어진다. 이것은 백 로그를 구축합니다.
  2. 오전 1:28 경에 파이프 라인이 차단 해제되고 진행이 시작됩니다.
  3. 큰 백 로그 때문에 파이프 라인은 최대 30 명의 작업자를 확장합니다. 당신이 배율이 자동 UI를 보면

또한, 30 명 노동자에 올라가고에 대한 정당화는 다음과 같습니다

는 "파이프 라인과 을 잡을 수 있도록하는 것이 30 노동자의 수를 제기의 백 로그를 사용하고 입력 속도를 유지하십시오. "

희망 하시겠습니까?

+0

답장을 보내 주셔서 감사합니다. 처리량이 떨어지는 이유는 무엇입니까? 어떻게 디버깅 할 수 있습니까? – rhg

+0

처리량의 저하는 BigTable에 느린 쓰기로 인한 것일 수 있습니다. 이 문제는 충분히 큰 문제일까요? – Pablo

+0

메시지에서 일부 필드를 추출하고 큰 테이블에 쓰고 병렬로 창 원시 메시지를 1 분의 고정 창으로 만들고 우리는 10 개 파편이있는 창으로 작성된 쓰기를 사용하여 GCS에 원시 데이터를 씁니다. GCS에 쓰면 처리량이 떨어지는 지 확실하지 않습니다. – rhg

관련 문제