1
Apache Beam에서 TextIO를 사용하여 PubSub에서받은 메시지를 GCS의 텍스트 파일에 쓰는 방법은 무엇입니까? withWindowedWrites() 및 withFilenamePolicy()와 같은 일부 메소드를 보았지만 문서에서 해당 메소드의 예를 찾을 수 없습니다.Apache Beam을 사용하여 GCS에 스트리밍 데이터 쓰기
Apache Beam에서 TextIO를 사용하여 PubSub에서받은 메시지를 GCS의 텍스트 파일에 쓰는 방법은 무엇입니까? withWindowedWrites() 및 withFilenamePolicy()와 같은 일부 메소드를 보았지만 문서에서 해당 메소드의 예를 찾을 수 없습니다.Apache Beam을 사용하여 GCS에 스트리밍 데이터 쓰기
다음은 Java SDK (BEAM 2.1.0)를 사용하는 경우 제공되는 예제입니다.
PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.begin()
.apply("PubsubIO",PubsubIO.readStrings()
.withTimestampAttribute("timestamp")
.fromSubscription("projects/YOUR-PROJECT/subscriptions/YOUR-SUBSCRIPTION"))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30L))))
.apply(TextIO.write().to("gs://YOUR-BUCKET").withWindowedWrites());
당신은 SDK가 TextIO.Write.expand (PCollection 입력)에서 "확장"방법을 탐색하여 파일 이름 지정에 사용하는 기본값을 볼 수 있습니다. 특히 DefaultFilenamePolicy.java를 살펴 보겠습니다.
setStreaming을 호출 할 필요가 없습니다. 더 중요한 것은 withWindowedWrites를 사용하기 위해 윈도우를 추가해야한다는 것입니다. – jkff
방금 StreamingOptions를 통해 읽었습니다. 파이프 라인에 Unbounded PCollection이 포함되어 있으면 기본값이 true로 표시됩니다. 또한 코드 스 니펫을 윈도우 논리를 포함하도록 업데이트하고 PipelineOptions에서 중복 setStreaming 구성을 제거했습니다. –
저에게 도움이 된 감사합니다 @NalseezDuke !!! – rish0097