답변

2

다음은 Java SDK (BEAM 2.1.0)를 사용하는 경우 제공되는 예제입니다.

PipelineOptions options = PipelineOptionsFactory.fromArgs(args) 
                .withValidation() 
                .as(PipelineOptions.class); 

Pipeline pipeline = Pipeline.create(options); 

pipeline.begin() 
       .apply("PubsubIO",PubsubIO.readStrings() 
        .withTimestampAttribute("timestamp") 
        .fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION")) 
       .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L)))) 
       .apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites()); 

당신은 SDK가 TextIO.Write.expand (PCollection 입력)에서 "확장"방법을 탐색하여 파일 이름 지정에 사용하는 기본값을 볼 수 있습니다. 특히 DefaultFilenamePolicy.java를 살펴 보겠습니다.

+2

setStreaming을 호출 할 필요가 없습니다. 더 중요한 것은 withWindowedWrites를 사용하기 위해 윈도우를 추가해야한다는 것입니다. – jkff

+1

방금 ​​StreamingOptions를 통해 읽었습니다. 파이프 라인에 Unbounded PCollection이 포함되어 있으면 기본값이 true로 표시됩니다. 또한 코드 스 니펫을 윈도우 논리를 포함하도록 업데이트하고 PipelineOptions에서 중복 setStreaming 구성을 제거했습니다. –

+1

저에게 도움이 된 감사합니다 @NalseezDuke !!! – rish0097

관련 문제