2017-02-28 2 views
4

DaskDistributed을 사용하여 데이터 분석 파이프 라인을 개발하는 데 많은 성공을 거두고 있습니다. 그러나 여전히 개선을 기대하고있는 한 가지는 예외 처리 방식입니다.Dask에서 예외를 처리하는 방법

는 지금, 내가 쓰는

def my_function (value): 
    return 1/value 

results = (dask.bag 
    .from_sequence(range(-10, 10)) 
    .map(my_function)) 

print(results.compute()) 

을 다음 ... 그때는 역 추적 (근로자 당 하나, 내가 추측하고있어)의 길고 긴 목록을 프로그램을 실행합니다. 여기

distributed.utils - ERROR - division by zero 
Traceback (most recent call last): 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f 
    result[0] = yield gen.maybe_future(func(*args, **kwargs)) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run 
    value = future.result() 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result 
    raise_exc_info(self._exc_info) 
    File "<string>", line 3, in raise_exc_info 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run 
    yielded = self.gen.throw(*exc_info) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in _get 
    result = yield self._gather(packed) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run 
    value = future.result() 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result 
    raise_exc_info(self._exc_info) 
    File "<string>", line 3, in raise_exc_info 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run 
    yielded = self.gen.throw(*exc_info) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 923, in _gather 
    st.traceback) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/six.py", line 685, in reraise 
    raise value.with_traceback(tb) 
    File "/mnt/lustrefs/work/aurelien.mazurie/test_dask/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/dask/bag/core.py", line 1411, in reify 
    File "test.py", line 9, in my_function 
    return 1/value 
ZeroDivisionError: division by zero 

되는 가장 중요한 부분은 물론, 육안 검사에서 오류가 0으로 번호를 나눈 것을 말해 것이다. 이 오류를 추적하는 더 좋은 방법이 있다면 궁금합니다. 예를 들어, 나는 예외 자체를 잡을 수있을 것 없습니다

import dask.bag 
import distributed 

try: 
    dask_scheduler = "127.0.0.1:8786" 
    dask_client = distributed.Client(dask_scheduler) 

    def my_function (value): 
     return 1/value 

    results = (dask.bag 
     .from_sequence(range(-10, 10)) 
     .map(my_function)) 

    #dask_client.persist(results) 

    print(results.compute()) 

except Exception as e: 
    print("error: %s" % e) 

편집 : 내 예제에서 나는 이 아닌 DASK을 분산 사용하고 있습니다. dask-scheduler은 4 개의 dask-worker 프로세스가 등록 된 포트 8786에서 수신 대기합니다.

이 코드는 위에서 설명한 것과 똑같은 출력을 생성합니다. 즉, 실제로는 try/except 블록을 사용하여 예외를 catch하지 않습니다.

이제 클러스터 전체에 분산 된 작업에 대해 이야기하고 있으므로 분명히 예외를 나에게 전파하는 것은 쉽지 않습니다. 그렇게하기위한 지침이 있습니까?

def my_function (value): 
    try: 
     return {"result": 1/value, "error": None} 
    except ZeroDivisionError: 
     return {"result": None, "error": "boom!"} 

results = (dask.bag 
    .from_sequence(range(-10, 10)) 
    .map(my_function)) 

dask_client.persist(results) 

errors = (results 
    .pluck("error") 
    .filter(lambda x: x is not None) 
    .compute()) 

print(errors) 

results = (results 
    .pluck("result") 
    .filter(lambda x: x is not None) 
    .compute()) 

print(results) 

이 작동하지만 내가 여기 sandblasting the soup cracker 해요 궁금 해요 : 지금 내 솔루션은 기능 결과 및 선택적 오류 메시지를 모두 반환하는 것입니다, 다음 개별적으로 결과 및 오류 메시지를 처리합니다. EDIT : 다른 옵션은 Maybe 모나드 같은 것을 사용하는 것이지만, 다시 한번 나는 그것을 과소 평가했는지 알고 싶습니다.

답변

0

Dask는 원격으로 발생한 예외를 자동으로 패키지하고 로컬에서 다시 발생시킵니다. 여기에 내가 당신의 모범을 실행할 때 얻을 수있는 것이있다.

In [1]: from dask.distributed import Client 

In [2]: client = Client('localhost:8786') 

In [3]: import dask.bag 

In [4]: try: 
    ...:  def my_function (value): 
    ...:   return 1/value 
    ...: 
    ...:  results = (dask.bag 
    ...:   .from_sequence(range(-10, 10)) 
    ...:   .map(my_function)) 
    ...: 
    ...:  print(results.compute()) 
    ...: 
    ...: except Exception as e: 
    ...:  import pdb; pdb.set_trace() 
    ...:  print("error: %s" % e) 
    ...:  
distributed.utils - ERROR - division by zero 
> <ipython-input-4-17aa5fbfb732>(13)<module>() 
-> print("error: %s" % e) 
(Pdb) pp e 
ZeroDivisionError('division by zero',) 
+0

그것은 흥미로운 부분이다. 이것은 정확히 동일한 코드를 실행하는 동작이 아니며, (키와) 차이점은'distributed'를 사용하고'Client' 객체를 사용하여 내 작업을 제출한다는 것입니다. 예외를 다시 발생시키는 동작은 ** Dask **에서만 처리되지만 ** distributed **에서는 처리되지 않을 수 있습니까? 추가 정보로, 작업자는 원격 계산 클러스터에 있습니다. – ajmazurie

+0

분산 된 스케줄러에 연결하는 방법을 포함하도록 답변을 업데이트했습니다. 결과는 같습니다. – MRocklin

관련 문제