Dask 및 Distributed을 사용하여 데이터 분석 파이프 라인을 개발하는 데 많은 성공을 거두고 있습니다. 그러나 여전히 개선을 기대하고있는 한 가지는 예외 처리 방식입니다.Dask에서 예외를 처리하는 방법
는 지금, 내가 쓰는
def my_function (value):
return 1/value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
print(results.compute())
을 다음 ... 그때는 역 추적 (근로자 당 하나, 내가 추측하고있어)의 길고 긴 목록을 프로그램을 실행합니다. 여기
distributed.utils - ERROR - division by zero
Traceback (most recent call last):
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
result[0] = yield gen.maybe_future(func(*args, **kwargs))
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in _get
result = yield self._gather(packed)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 923, in _gather
st.traceback)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/mnt/lustrefs/work/aurelien.mazurie/test_dask/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/dask/bag/core.py", line 1411, in reify
File "test.py", line 9, in my_function
return 1/value
ZeroDivisionError: division by zero
되는 가장 중요한 부분은 물론, 육안 검사에서 오류가 0으로 번호를 나눈 것을 말해 것이다. 이 오류를 추적하는 더 좋은 방법이 있다면 궁금합니다. 예를 들어, 나는 예외 자체를 잡을 수있을 것 없습니다
import dask.bag
import distributed
try:
dask_scheduler = "127.0.0.1:8786"
dask_client = distributed.Client(dask_scheduler)
def my_function (value):
return 1/value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
#dask_client.persist(results)
print(results.compute())
except Exception as e:
print("error: %s" % e)
편집 : 내 예제에서 나는 이 아닌 DASK을을 분산 사용하고 있습니다. dask-scheduler
은 4 개의 dask-worker
프로세스가 등록 된 포트 8786에서 수신 대기합니다.
이 코드는 위에서 설명한 것과 똑같은 출력을 생성합니다. 즉, 실제로는 try
/except
블록을 사용하여 예외를 catch하지 않습니다.
이제 클러스터 전체에 분산 된 작업에 대해 이야기하고 있으므로 분명히 예외를 나에게 전파하는 것은 쉽지 않습니다. 그렇게하기위한 지침이 있습니까?
def my_function (value):
try:
return {"result": 1/value, "error": None}
except ZeroDivisionError:
return {"result": None, "error": "boom!"}
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
dask_client.persist(results)
errors = (results
.pluck("error")
.filter(lambda x: x is not None)
.compute())
print(errors)
results = (results
.pluck("result")
.filter(lambda x: x is not None)
.compute())
print(results)
이 작동하지만 내가 여기 sandblasting the soup cracker 해요 궁금 해요 : 지금 내 솔루션은 기능 결과 및 선택적 오류 메시지를 모두 반환하는 것입니다, 다음 개별적으로 결과 및 오류 메시지를 처리합니다. EDIT : 다른 옵션은 Maybe
모나드 같은 것을 사용하는 것이지만, 다시 한번 나는 그것을 과소 평가했는지 알고 싶습니다.
그것은 흥미로운 부분이다. 이것은 정확히 동일한 코드를 실행하는 동작이 아니며, (키와) 차이점은'distributed'를 사용하고'Client' 객체를 사용하여 내 작업을 제출한다는 것입니다. 예외를 다시 발생시키는 동작은 ** Dask **에서만 처리되지만 ** distributed **에서는 처리되지 않을 수 있습니까? 추가 정보로, 작업자는 원격 계산 클러스터에 있습니다. – ajmazurie
분산 된 스케줄러에 연결하는 방법을 포함하도록 답변을 업데이트했습니다. 결과는 같습니다. – MRocklin