2017-10-10 5 views
0

beam.Filter의 일부로 사용될 단어 목록이 포함 된 .txt 파일이 GCS에 저장된 경우이 목록을 아파치 빔 파이프 라인에서 동적으로 액세스 할 수 있습니까? 이 목록을 파이프 라인 내에서 전역 변수로 정의 할 수는 있지만 전체 파일을 목록으로 읽는 방법과이 작업을 수행하는 데 필요한 빔 요령이 있는지는 잘 모르겠습니다. 어떤 제안? 당신은 side input로 단어의 목록에 액세스 할 수 있습니다Google Cloud Dataflow 클라우드 저장소의 .txt 파일

답변

3

: "반복 가능한 형태가 아닌 '_InvalidUnpickledPCollection'의 인수되는 형식 오류"여기에 작동하지 않습니다 내 현재 구현 ..

def boolean_terms(word, term_list): 
    if word in term_list: 
    return (word, 1) 
    else: 
    return (word, 0) 

# side table 
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file) 

words = ... 

filtered_words = words | beam.FlatMap(lambda x: 
    [boolean_terms(word, filter_terms) for word in x]) 

나는 다음과 같은 오류 얻을 것입니다 . beam.Filter 변형은 해당 링크의 예제에서 FlatMapParDo과 정확히 같은 방식으로 필터 함수의 측면 입력 사용을 지원한다고 생각합니다. 같은

뭔가 :

words | beam.Filter(lambda x, filter_terms: word in filter_terms, 
        filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path))) 
+0

감사합니다! 나는 내가 더 가깝다고 생각하지만, 여전히 나를 위해 일하는 것처럼 보이지 않습니다. 내가 놓친 게 있니? – reese0106

+0

아, 생각났습니다. 올바르게 작동하려면'pvalue.AsList (filter_terms) '를 추가해야했습니다. – reese0106

관련 문제