1

Dataflow 작업에서 Google Cloud Storage에 저장된 맞춤형 바이너리 파일을 처리해야합니다.FileBasedSource가 Google Cloud Storage의 여러 특정 파일에 해당하는 glob을 인식하지 못합니다.

이렇게하려면 맞춤 FileBasedSource을 작성했습니다. 문서에 나와 있듯이 은 Java glob, 단일 파일 또는 단일 파일의 오프셋 범위로 정의 된 파일 패턴으로 뒷받침됩니다.

제 경우에는 /path/{file1,file1,file3}과 같이 특정 파일 이름을 가진 Java glob을 사용해야합니다. 때 나는 그것이 잘 작동 로컬 파일 시스템에서 테스트하지만 구글 클라우드 스토리지 (gs://bucket/{file1,file2,file3})와 함께 사용할 경우 어떤 파일을 찾을 수 없습니다 있고 난 다음 스택 트레이스 얻을 : 나는 정확한이를 사용하는 경우

java.io.IOException: Error executing batch GCS request 
     at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:603) 
     at org.apache.beam.sdk.util.GcsUtil.getObjects(GcsUtil.java:342) 
     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:217) 
     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:86) 
     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:111) 
     at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:207) 
     at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78) 
     at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53) 
     at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40) 
     at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37) 
     at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:439) 
     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:602) 
     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594) 
     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:276) 
     at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:210) 
     at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440) 
     at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:383) 
     at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:173) 
     at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:556) 
     at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:167) 
     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) 
     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) 
     at com.travelaudience.data.job.rtbtobigquery.Main$.main(Main.scala:74) 
     at com.travelaudience.data.job.rtbtobigquery.Main.main(Main.scala) 
Caused by: java.util.concurrent.ExecutionException: com.google.api.client.http.HttpResponseException: 400 Bad Request 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:479) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76) 
     at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:595) 
     ... 23 more 
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request 
     at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1070) 
     at com.google.api.client.googleapis.batch.BatchRequest.execute(BatchRequest.java:241) 
     at org.apache.beam.sdk.util.GcsUtil$3.call(GcsUtil.java:588) 
     at org.apache.beam.sdk.util.GcsUtil$3.call(GcsUtil.java:586) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748) 

을 같은 gsutil와 같은 glob이 gsutil ls gs://bucket/{file1,file2,file3} 그것은 3 파일을 올바르게 나열합니다. 코드에서이 같은 glob이 gs://bucket/dir/* 작동합니다.

빔 버전 2.1.0을 사용합니다.

무엇이 잘못 되었습니까?

도움 주셔서 감사합니다.

답변

1

일치하는 GCS 파일에 대해 only a subset of the glob syntax을 지원합니다. 이 아닌 *?을 지원합니다. 우리의 문서는 현재 이것을 잘 설명하지 못합니다. 아마도 FileSystems.match()에 문서화되어 있어야하고 glob 일치가 나타나는 다른 클래스와 링크되어 있어야합니다.

+0

답장을 보내 주셔서 감사합니다. '{'또는'? '를 지원할 계획입니까? 이것은 정말로 도움이 될 것입니다. – bnjzer

+0

나는 특별히이 작업을하는 사람을 알지 못하지만 물론 환영 할만한 기여가 될 것입니다. Beam 2.2 (이미 마스터에서 사용 가능하며 릴리스는 현재 투표 중임)를 사용하여 읽을 파일이 여러 개인 경우 FileIO.match()/matchAll()/read()를 사용하는 것이 좋습니다. FileBasedSource를 구현하는 것보다 더 적은 상용구가 필요합니다. – jkff

관련 문제