2016-12-08 4 views
2

편집 : 해결 :!아파치 빔 소스 가져 오기 파일 이름

여러 언어로 된 여러 개의 텍스트 파일이 있습니다. Apache Beam을 사용하여 각 행에 언어 태그를 추가하고 싶습니다.

예 :

파일 text_en : This is a sentence.

파일 text_de :

en: This is a sentence. 
de: Dies ist ein Satz. 
: Dies ist ein Satz.

내가 원하는 것은 이것이다

내가 무엇을 시도했다 : 내가 처음에 한 TextIO.Read.From(dataSetDirectory+"/*")를 사용 .getSource() 같이 보입니다 옵션을 찾기 위해 노력

. 그러나 이것은 존재하지 않는 것 같습니다. 그러나 내 DoFn LanguageTagAdder 첫 번째 언어로 초기화되고, 따라서 모든 파일을 완벽하게 정상적으로이 작품이 방법으로 파일을 읽기

File[] files = new File(datasetDirectory).listFiles(); 
PCollectionList<String> dataSet=null; 
for (File f: files) { 
    String language = f.getName(); 
    logger.debug(language); 
    PCollection<String> newPCollection = p.apply(
      TextIO.Read.from(f.getAbsolutePath())) 
       .apply(ParDo.of(new LanguageTagAdder(language))); 

    if (dataSet==null) { 
     dataSet=PCollectionList.of(newPCollection); 
    } else { 
     dataSet.and(newPCollection); 
    } 
} 
PCollection<String> completeDataset= dataSet.apply(Flatten.<String>pCollections()) 

:

다음은이 같은 하나 모든 파일을 읽으려고 동일한 추가 언어.

LanguageTagAdder

은 다음과 같습니다

public class LanguageTagAdder 
      extends DoFn<String,String> { 

     private String language; 
     public LanguageTagAdder(String language) { 
      this.language=language; 
     } 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      c.output(language+c.element()); 
     } 
    } 

나는이 문제가 의도하고 데이터가 parrallelized 할 수 있도록 필요 실현,하지만 난 내 문제를 해결하는 방법을 갈 것인가? 빔이 있습니까? - 해결 방법은 있습니까?

PS : EDIT

DEBUG 2016-12-05 17:09:55,070 [main] de.kdld16.hpi.FusionDataset - en 
DEBUG 2016-12-05 17:09:56,216 [main] de.kdld16.hpi.FusionDataset - de 
WARN 2016-12-05 17:09:56,219 [main] org.apache.beam.sdk.Pipeline - Transform TextIO.Read2 does not have a stable unique name. This will prevent updating of pipelines. 

: 문제 있던 라인

dataSet.and(newPCollection);

에게 (제 2 언어로) 번째 시간 동안 new LanguageTagAdder 작성시 I 경고 따라 얻을

다음과 같이 다시 써야합니다.

dataSet=dataSet.and(newPCollection);

데이터 세트에는 첫 번째 파일 만 포함되어있었습니다 .... 그들은 모두 동일한 언어 태그를 가지고 있습니다.

dataSet=dataSet.and(newPCollection); 

그것이 방법은 데이터 집합은 첫 번째 파일을 포함 :

+0

"다음으로 모든 파일을 하나씩 읽으려고했습니다."라는 예는 효과가 있으며 현재 원하는 것을 얻기 위해 빔에서 가장 쉬운 방법입니다. LanguageTagAdder가 첫 번째 언어로만 초기화된다는 것을 어떻게 발견했는지에 대해 자세히 설명해 주시겠습니까? 귀하의 코드는 LanguageTagAdder의 다른 인스턴스를 생성하고 있으며 Beam은 그대로 사용해야합니다. 실제 상황에서 그런 상황이 아니라고 말하는 것입니까? – jkff

+0

또한 어떤 주자를 사용하고 있습니까? DirectRunner가 아닌 경우 : DirectRunner에서 문제가 재현됩니까? – jkff

+0

DirectRunner를 사용하고 있습니다. 대부분의 LanguageTagAdders는 (많은 다른 언어로) 초기화되며, 생성자에 중단 점을 넣을 때 볼 수 있습니다. 그러나 모든 텍스트 파일의 모든 줄에는 동일한 언어 태그 "en"만 있습니다. 은'''코드는 LanguageTagAdder의 다른 인스턴스를 생성하고, 그대로 빔을 사용한다 : 당신이 실제로 일어나고있는 일이 아니다 말하는''' 예이 정확히 말인지 무슨. – JoSauderGH

답변

2

문제는로 다시 작성하는 데 필요한 라인

dataSet.and(newPCollection); 

했다.

관련 문제