아마존의 EMR에서 mrjob을 실행하려고합니다. 인라인 러너를 사용하여 로컬에서 작업을 테스트했지만 Amazon에서 실행하면 실패합니다. 나는 외부 데이터 파일 zip_codes.txt
에 의존하여 실패를 좁혔다. 하드 코드 된 우편 번호 데이터를 사용하여 종속성없이 실행하면 정상적으로 작동합니다.EMR에서 mrjob에 데이터 파일을 포함시키는 방법은 무엇입니까?
업로드 파일 인수를 사용하여 필요한 데이터 파일을 포함하려고했습니다. S3에서 보면 파일이 그 파일을 만들었지 만 명확하게 뭔가 잘못되어 로컬로 액세스 할 수 없습니다.
여기 내 mrjob.conf
파일입니다
runners:
emr:
aws_access_key_id: FOOBARBAZQUX
aws_secret_access_key: IAMASECRETKEY
aws_region: us-east-1
ec2_key_pair: mapreduce
ec2_key_pair_file: $ENV/keys/mapreduce.pem
ssh_tunnel_to_job_tracker: true
ssh_tunnel_is_open: true
cleanup_on_failure: ALL
cmdenv:
TZ: America/Los_Angeles
이 내 MR_zip.py
파일입니다. zip_codes.txt의 모습은 어디
python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt
:
from mrjob.job import MRJob
import mrjob
import csv
def distance(p1, p2):
# d = ...
return d
class MR_zip(MRJob):
OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))}
def mapper(self, _, line):
zip_code_1, poi = line.split(",")
zip_code_1 = int(zip_code_1)
lat1, lon1 = self.zip_codes[zip_code_1]
for zip_code_2, (lat2, lon2) in self.zip_codes.items():
d = distance((lat1, lon1), (lat2, lon2))
yield zip_code_2, (zip_code_1, poi, d)
def reducer(self, zip_code_1, ds):
result = {}
for zip_code_2, poi, d in ds:
if poi not in result:
result[poi] = (zip_code_2, d)
elif result[poi][1] > d:
result[poi] = (zip_code_2, d)
yield zip_code_1, result
if __name__ == '__main__':
MR_zip.run()
그리고 마지막으로, 나는 다음과 같은 명령을 실행
...
62323,39.817702,-90.66923
62324,39.988988,-90.94976
62325,40.034398,-91.16278
62326,40.421857,-90.80333
...
그리고 poi.txt은 다음과 같습니다
...
210,skate park
501,theatre
29001,theatre
8001,knitting club
20101,food bank
...