2013-09-24 4 views
0

아마존의 EMR에서 mrjob을 실행하려고합니다. 인라인 러너를 사용하여 로컬에서 작업을 테스트했지만 Amazon에서 실행하면 실패합니다. 나는 외부 데이터 파일 zip_codes.txt에 의존하여 실패를 좁혔다. 하드 코드 된 우편 번호 데이터를 사용하여 종속성없이 실행하면 정상적으로 작동합니다.EMR에서 mrjob에 데이터 파일을 포함시키는 방법은 무엇입니까?

업로드 파일 인수를 사용하여 필요한 데이터 파일을 포함하려고했습니다. S3에서 보면 파일이 그 파일을 만들었지 만 명확하게 뭔가 잘못되어 로컬로 액세스 할 수 없습니다.

enter image description here

여기 내 mrjob.conf 파일입니다

runners: 
    emr: 
    aws_access_key_id: FOOBARBAZQUX 
    aws_secret_access_key: IAMASECRETKEY 
    aws_region: us-east-1 
    ec2_key_pair: mapreduce 
    ec2_key_pair_file: $ENV/keys/mapreduce.pem 
    ssh_tunnel_to_job_tracker: true 
    ssh_tunnel_is_open: true 
    cleanup_on_failure: ALL 
    cmdenv: 
     TZ: America/Los_Angeles 

이 내 MR_zip.py 파일입니다. zip_codes.txt의 모습은 어디

python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt 

:

from mrjob.job import MRJob 
import mrjob 
import csv 

def distance(p1, p2): 
    # d = ...  
    return d 

class MR_zip(MRJob): 
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol 
    zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))} 

    def mapper(self, _, line): 
     zip_code_1, poi = line.split(",") 
     zip_code_1 = int(zip_code_1) 
     lat1, lon1 = self.zip_codes[zip_code_1] 
     for zip_code_2, (lat2, lon2) in self.zip_codes.items(): 
      d = distance((lat1, lon1), (lat2, lon2)) 
      yield zip_code_2, (zip_code_1, poi, d) 

    def reducer(self, zip_code_1, ds): 
     result = {} 
     for zip_code_2, poi, d in ds: 
      if poi not in result: 
       result[poi] = (zip_code_2, d) 
      elif result[poi][1] > d: 
       result[poi] = (zip_code_2, d) 
     yield zip_code_1, result 

if __name__ == '__main__': 
    MR_zip.run() 

그리고 마지막으로, 나는 다음과 같은 명령을 실행

... 
62323,39.817702,-90.66923 
62324,39.988988,-90.94976 
62325,40.034398,-91.16278 
62326,40.421857,-90.80333 
... 

그리고 poi.txt은 다음과 같습니다

... 
210,skate park 
501,theatre 
29001,theatre 
8001,knitting club 
20101,food bank 
... 

답변

1

Overv iew

내 코드에 두 개의 오류가 발생했습니다 :

  1. 기본적으로
  2. 은 EMR 다른 일
  3. 을 사이에 사전 함축을 배제 파이썬 2.6 사용하는 단계에 대한 초기화 코드는 단계의 초기화에 있어야 이제까지

단계 초기화

y 단계는 해당 이니셜 라이저 메소드를가집니다. 예를 들어 mapper은 매퍼에서 사용되는 데이터를 초기화하는 데 사용할 수있는 mapper_init입니다. reducercombiner 함수에는 유사한 초기화 메소드가 있습니다. steps 함수를 사용하여 자신의 단계를 정의한 다음 사용하는 초기화 함수를 정의 할 수도 있습니다. 초기화 도구에 대한 자세한 내용은 here을 참조하십시오.

파이썬 버전 오늘로

조심, EMR은 기본적으로 파이썬 버전 2.6.6을 사용합니다. 따라서 이후 버전의 모든 종속 항목은 로컬로 실행될 수 있지만 EMR에 문제가 있습니다.

위의 코드를 복구

픽스는 MR_zip.py

zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))} 

zip_codes을 정의하는 선을 제거하고 대신에 사전 지능형를 사용하지 않고 mapper_init의 내측을 정의하는 것이 필요하다.

def mapper_init(self): 
    self.zip_codes = {} 
    for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")): 
     self.zip_codes[int(zip_code)] = (float(latitude), float(longitude)) 

다른 파일과 명령 줄은 동일하게 유지됩니다.

3

또한 유용한 MRJob.add_file_option 루틴을 찾을 수 있습니다. 예를 들어

self.add_file_option('--config-file', dest='config_file', 
    default=None, help='file with labels', action="append") 

을 지정하면 self.options.config_file 경로 목록을 통해 업로드 된 파일을 참조 할 수 있습니다.

관련 문제