2014-06-16 1 views
1

지금까지 Google App Engine에 3 개의 MapReduce 작업을 연결했습니다. 각 MapReduce 작업 사이에서 출력을 산출하고 그 출력을 다음 MapReduce 작업으로 전달합니다. 이제는 서로 다른 MapReduce 작업의 두 가지 출력을 단일 MapReduce 작업으로 전달하고 싶습니다. 누구든지이 작업을 수행하는 방법을 제안 할 수 있습니까?Google App Engine에서 세 번째 MapReduce 작업 (여러 개의 BlobKey/출력 전달)을 MapReduce 작업에 전달하기

num_shards=2 
# First define the parent pipeline job 
class RecommenderPipeline(base_handler.PipelineBase): 
"""A pipeline to run Recommender demo. 

Args: 
blobkey: blobkey to process as string. Should be a zip archive with 
    text files inside. 
""" 

def run(self, filekey, blobkey, itr): 
    logging.debug("------------------PIPELINE FILEKEY is %s" % filekey) 
    logging.debug("------------------PIPELINE BLOBKEY is %s" % blobkey) 
    output1 = yield mapreduce_pipeline.MapreducePipeline(
    "recommender", 
    "main.recommender_group_by_user_rating_map1", 
    "main.recommender_count_ratings_user_freq_reduce1", 
    "mapreduce.input_readers.BlobstoreLineInputReader", 
    "mapreduce.output_writers.BlobstoreOutputWriter", 
    mapper_params={ 
     "blob_keys": blobkey, 
    }, 
    reducer_params={ 
     "mime_type": "text/plain", 
    }, 
    shards=num_shards) 
    output1yield = yield BlobKeys(output1) 

    # Code below takes output1 and feeds into second mapreduce job. 
    # Pipeline library ensures that the second pipeline depends on first and 
    # does not launch until the first has resolved. 
    output2 = (
    yield mapreduce_pipeline.MapreducePipeline(
    "recommender", 
    "main.recommender_pairwise_items_map2", 
    "main.recommender_calc_similarity_reduce2", 
    "mapreduce.input_readers.BlobstoreLineInputReader", 
    "mapreduce.output_writers.BlobstoreOutputWriter", 
    mapper_params=(output1yield), #see BlobKeys Class!` 
    reducer_params={ 
     "mime_type": "text/plain", 
    }, 
    shards=num_shards)) 
    output2yield = yield BlobKeys(output2) 

    # Code below takes output2 and feeds into third mapreduce job. 
    # Pipeline library ensures that the third pipeline depends on second and 
    # does not launch until the second has resolved. 
    output3 = yield mapreduce_pipeline.MapreducePipeline(
    "recommender", 
    "main.recommender_weighted_sum_map3", 
    "main.recommender_weighted_sum_reduce3", 
    "mapreduce.input_readers.BlobstoreLineInputReader", 
    "mapreduce.output_writers.BlobstoreOutputWriter", 
    mapper_params=(output2yield), #see BlobKeys Class!` 
    reducer_params={ 
     "mime_type": "text/plain", 
    }, 
    shards=num_shards) 
    yield StoreOutput("Recommender", filekey, output3, itr) #stores key to results 

답변

0

하나의 간단한 해결책은 1 개 또는 2 미래의 결과를 받아 들일 BlobKeys을 변경하는 것 (또는 당신은 각 파이프 라인 결과에 대한 서로 다른 키를 사용하여 common.Dict을 사용할 수 있습니다) : 나는 아래에있는 내 파이프 라인에 붙여 넣습니다.

+0

다시 감사합니다. ozarov! BlobKeys 입력은 좋은 아이디어입니다. common.dict에 대해 자세히 설명해 주시겠습니까? 나는 너를 잘 모르겠다. 사전을 사용하여 BlobKeys 출력을 하나의 객체로 "수집"하는 것을 의미합니까? –

+0

Ozarov, BlobKeys를 변경하여 향후 2 가지 결과를 허용하는 방법을 설명 할 수 있습니까? 다음을 시도했지만 아직 작동하지 않습니다. –

+0

클래스 BlobKeys2 (base_handler.PipelineBase) : "" "제공된 키워드 인수가있는 사전을 반환합니다." "" def run (self, key1, key2) : 로깅 .debug ("------------------- BLOBKEYS % s"% keys) ##/blobstore/ 문자열에서 각 키를 제거하십시오. "blob_keys": [key2의 k에 대한 k.split ("/") [- 1]] [[ "blob_keys": [k.split ("/") [key1의 k에 대한 - ] } –