2012-02-15 5 views
6

MapReduce, MRJob 용 Yelp의 Python API를 사용하는 방법을 배우려고합니다. 그들의 간단한 단어 카운터 예제는 의미가 있지만 여러 입력을 포함하는 응용 프로그램을 처리하는 방법이 궁금합니다. 예를 들어 문서의 단어를 단순히 계산하는 것이 아니라 벡터에 행렬을 곱하는 것입니다. 나는이 솔루션 기능을 함께했다,하지만 바보 느낌 :MRJob을 이용한 다중 입력

이 코드는 ./matrix.py < input.txt를 실행하고 작동하는 이유는
class MatrixVectMultiplyTast(MRJob): 
    def multiply(self,key,line): 
      line = map(float,line.split(" ")) 
      v,col = line[-1],line[:-1] 

      for i in xrange(len(col)): 
        yield i,col[i]*v 

    def sum(self,i,occurrences): 
      yield i,sum(occurrences) 

    def steps(self): 
      return [self.mr (self.multiply,self.sum),] 

if __name__=="__main__": 
    MatrixVectMultiplyTast.run() 

그에 대응하는 벡터 값, 열을 기준으로 input.txt를 저장 행렬 라인의 끝.

그래서, 다음의 행렬과 벡터 :

enter image description here

이 같은 input.txt를로 표시됩니다 : 한마디로

enter image description here

, 내가 매트릭스를 저장에 대해 가서 얼마나 벡터를 자연스럽게 별도의 파일에 저장하고 둘 다 MRJob에 전달하면됩니까?

답변

3

다른에 대한 원시 데이터를 처리하는 필요에 있다면 (또는 같은 row_i, row_j) 데이터 세트, 당신도 할 수 있습니다

1) 데이터의 복사본을 저장하는 S3 버킷을 만듭니다. 이 사본의 위치를 ​​작업 클래스 (예 : 아래 코드에서 self.options.bucket 및 self.options.my_datafile_copy_location. 주의 사항 : 안타깝게도 파일을 처리하기 전에 전체 파일을 작업 컴퓨터에 "다운로드"해야하는 것으로 보입니다. 연결이 느려지거나로드하는 데 너무 오래 걸리면이 작업이 실패 할 수 있습니다. 다음은이를 수행 할 파이썬/MRJob 코드입니다. 당신의 매퍼 기능에

넣어이 :

d1 = line1.split('\t', 1) 
v1, col1 = d1[0], d1[1] 
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING) 
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip() 
### CAVEAT: Needs to get the whole file before processing the rest. 
for line2 in data_copy.split('\n'): 
    d2 = line2.split('\t', 1) 
    v2, col2 = d2[0], d2[1] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
conn.close() 

2)는 SimpleDB의 도메인을 만들고, 거기에 모든 데이터를 저장합니다. BOTO 및 SimpleDB는 여기에 읽기 : http://code.google.com/p/boto/wiki/SimpleDbIntro

귀하의 매퍼 코드는 다음과 같습니다

dline = dline.strip() 
d0 = dline.split('\t', 1) 
v1, c1 = d0[0], d0[1] 
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME) 
for item in domain: 
    v2, c2 = item.name, item['column'] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
sdb.close() 

당신은 매우 많은 양의 데이터가있는 경우는 요청을 할 수 있기 때문에이 두 번째 옵션은 더 잘 수행 할 수 한 번에 전체 금액이 아닌 각 데이터 행마다. SimpleDB 값은 최대 1024 자까지만 가능하므로 데이터 값이 더 길면 몇 가지 방법으로 압축/압축을 풀어야 할 수도 있습니다.

1

예를 들어 로컬 파일에서 실행을 사용하는 경우에도 Amazon에서 Hadoop 서비스 또는 Hadoop 서비스를 활용하려는 경우가 아니라면 MrJob을 사용하지 않을 것입니다.

MrJob은 원칙적으로 "Hadoop streaming"을 사용하여 작업을 제출합니다.

즉, Hadoop의 파일 또는 폴더로 지정된 모든 입력이 매퍼로 스트리밍되고 그 결과가 감속기로 전달됨을 의미합니다. 모든 매퍼는 입력 슬라이스를 얻고 모든 입력을 개략적으로 동일하게 간주하여 각 데이터 슬라이스의 키 값을 균일하게 구문 분석하고 처리합니다.

이 이해에서 파생 된 입력은 개략적으로 매퍼와 동일합니다.두 개의 서로 다른 설계도 데이터를 포함 할 수있는 유일한 방법은 매퍼가 벡터 데이터와 매트릭스 데이터를 이해할 수있는 방식으로 동일한 파일에 인터리브하는 것입니다.

행이 행렬 데이터인지 또는 벡터 데이터인지를 지정함으로써 간단히 향상시킬 수 있습니다. 벡터 데이터를 보면 선행 행렬 데이터가 벡터 데이터에 적용됩니다.

matrix, 1, 2, ... 
matrix, 2, 4, ... 
vector, 3, 4, ... 
matrix, 1, 2, ... 
..... 

그러나 앞서 언급 한 프로세스가 잘 작동합니다. 모든 설계도 데이터를 단일 파일로 가져와야합니다.

그래도 여전히 문제가 있습니다. K, V map reduce는 전체 스키마가 단일 행에 있고 완전한 단일 처리 단위를 포함 할 때 더 잘 작동합니다.

내 이해에서 이미 올바르게하고 있지만 Map-Reduce가 이러한 종류의 데이터에 적합한 메커니즘이 아님을 알 수 있습니다. 내가 할 수있는 것보다 더 명확하게 해줄 수 있기를 바랍니다.

2

질문에 대한 실제 답변은 mrjob이 map_input_file 환경 변수 (map.input.file 속성을 노출 함)를 읽음으로써 hadoop 스트리밍 조인 패턴을 지원하지 않는다는 것입니다. 경로 및/또는 이름을 기반으로 처리합니다.

이 문서에 표시 될 때 당신은 여전히 ​​당신은 쉽게 그냥이 속한 입력 데이터 자체를 읽고 검색 할 수 있다면, 그것을 해낼 수있을 : 그건 아니다 그러나

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

항상 가능합니다 ...

그렇지 않으면 myjob이 환상적으로 보이고 앞으로도이 기능을 추가 할 수 있기를 바랍니다. 그 때까지 이것은 나를 위해 꽤 많은 거래 차단기입니다.

1

이것은 여러 입력을 사용하고 파일 이름을 기반으로 매퍼 단계에서 적절한 변경을하는 방법입니다.

러너 프로그램 :

from mrjob.hadoop import * 


#Define all arguments 

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S') 
hadoop_bin = '/usr/bin/hadoop' 
mode = 'hadoop' 
hs = HadoopFilesystem([hadoop_bin]) 

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"] 

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin] 
aargs.extend(input_file_names) 
aargs.extend(['-o',output_dir]) 
print aargs 
status_file = True 

mr_job = MRJob(args=aargs) 
with mr_job.make_runner() as runner: 
    runner.run() 
os.environ['HADOOP_HOME'] = '' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 

MRJob 클래스 :

class MR_Job(MRJob): 
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value' 
    def mapper(self, _, line): 
    """ 
    This function reads lines from file. 
    """ 
    try: 
     #Need to clean email. 
     input_file_name = get_jobconf_value('map.input.file').split('/')[-2] 
       """ 
       Mapper code 
       """ 
    except Exception, e: 
     print e 

    def reducer(self, email_id,visitor_id__date_time): 
    try: 
     """ 
       Reducer Code 
       """ 
    except: 
     pass 


if __name__ == '__main__': 
    MRV_Email.run()