1
지금까지 Google App Engine에 3 개의 MapReduce 작업을 연결했습니다. 각 MapReduce 작업 사이에서 출력을 산출하고 그 출력을 다음 MapReduce 작업으로 전달합니다. 이제는 서로 다른 MapReduce 작업의 두 가지 출력을 단일 MapReduce 작업으로 전달하고 싶습니다. 누구든지이 작업을 수행하는 방법을 제안 할 수 있습니까?Google App Engine에서 세 번째 MapReduce 작업 (여러 개의 BlobKey/출력 전달)을 MapReduce 작업에 전달하기
num_shards=2
# First define the parent pipeline job
class RecommenderPipeline(base_handler.PipelineBase):
"""A pipeline to run Recommender demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey, itr):
logging.debug("------------------PIPELINE FILEKEY is %s" % filekey)
logging.debug("------------------PIPELINE BLOBKEY is %s" % blobkey)
output1 = yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_group_by_user_rating_map1",
"main.recommender_count_ratings_user_freq_reduce1",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_keys": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards)
output1yield = yield BlobKeys(output1)
# Code below takes output1 and feeds into second mapreduce job.
# Pipeline library ensures that the second pipeline depends on first and
# does not launch until the first has resolved.
output2 = (
yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_pairwise_items_map2",
"main.recommender_calc_similarity_reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params=(output1yield), #see BlobKeys Class!`
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards))
output2yield = yield BlobKeys(output2)
# Code below takes output2 and feeds into third mapreduce job.
# Pipeline library ensures that the third pipeline depends on second and
# does not launch until the second has resolved.
output3 = yield mapreduce_pipeline.MapreducePipeline(
"recommender",
"main.recommender_weighted_sum_map3",
"main.recommender_weighted_sum_reduce3",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params=(output2yield), #see BlobKeys Class!`
reducer_params={
"mime_type": "text/plain",
},
shards=num_shards)
yield StoreOutput("Recommender", filekey, output3, itr) #stores key to results
다시 감사합니다. ozarov! BlobKeys 입력은 좋은 아이디어입니다. common.dict에 대해 자세히 설명해 주시겠습니까? 나는 너를 잘 모르겠다. 사전을 사용하여 BlobKeys 출력을 하나의 객체로 "수집"하는 것을 의미합니까? –
Ozarov, BlobKeys를 변경하여 향후 2 가지 결과를 허용하는 방법을 설명 할 수 있습니까? 다음을 시도했지만 아직 작동하지 않습니다. –
클래스 BlobKeys2 (base_handler.PipelineBase) : "" "제공된 키워드 인수가있는 사전을 반환합니다." "" def run (self, key1, key2) : 로깅 .debug ("------------------- BLOBKEYS % s"% keys) ##/blobstore/ 문자열에서 각 키를 제거하십시오. "blob_keys": [key2의 k에 대한 k.split ("/") [- 1]] [[ "blob_keys": [k.split ("/") [key1의 k에 대한 - ] } –