0

나는 ThreadPoolExecutor을 사용 중이며 작업자 스레드가 실패 할 경우를 대비하여 전체 계산을 중단해야합니다.작업자 스레드에서 던져진 예외를 즉시 다시 발생시키는 방법은 무엇입니까?

ThreadPoolExecutor 자동으로 예외를 다시 발생시키지 않습니다 때문에 관계없이 오류 성공을 인쇄 1. 예. .result() 예외를 재 - 제기 때문에

from concurrent.futures import ThreadPoolExecutor 

def task(): 
    raise ValueError 

with ThreadPoolExecutor() as executor: 
    executor.submit(task) 
print('Success') 

예 2이 제대로 메인 스레드를 충돌합니다. 그러나 첫 번째 작업이 완료 될 때까지 기다리므로 주 스레드는 지연과 함께 예외를 경험합니다.

가 어떻게 실패를 처리하고 나머지 근로자를 중단 발생 직후 (다소) 메인 스레드에서 작업자 예외를 알 수

import time 
from concurrent.futures import ThreadPoolExecutor 

def task(should_raise): 
    time.sleep(1) 
    if should_raise: 
     raise ValueError 

with ThreadPoolExecutor() as executor: 
    executor.submit(task, False).result() 
    executor.submit(task, True).result() 
print('Success') 
?

답변

1

첫째, 우리는 결과를 요청하기 전에 작업을 제출해야합니다.그렇지 않으면, 스레드도 병렬로 실행되지 않습니다

exc_info = None 

주 :

futures = [] 
with ThreadPoolExecutor() as executor: 
    futures.append(executor.submit(good_task)) 
    futures.append(executor.submit(bad_task)) 
for future in futures: 
    future.result() 

이제 우리는 주 스레드와 작업자 스레드 모두에 사용할 수있는 변수에 예외 정보를 저장할 수 있습니다 스레드는 정말 그 하위 프로세스를 죽일 수 없어, 그래서 우리는 예외 정보 설정하고 정지 할 때까지 노동자가 확인이 :

def good_task(): 
    global exc_info 
    while not exc_info: 
     time.sleep(0.1) 

def bad_task(): 
    global exc_info 
    time.sleep(0.2) 
    try: 
     raise ValueError() 
    except Exception: 
     exc_info = sys.exc_info() 

모든 스레드가 종료 된 후, 마이를 n thread는 예외 정보를 담고있는 변수를 검사 할 수있다. 채워지는 경우 예외를 다시 제기합니다.

if exc_info: 
    raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) 
print('Success') 
0

는 내가 그렇게 그것을 구현하는 것, 생각 :

  1. 하나의 예외를보고,
  2. 하나는 취소를 통지하기 :

    내가 주 과정, 나는이 대기열을 만들 수 있습니다.

::

import multiprocessing as mp 

error_queue = mp.Queue() 
cancel_queue = mp.Queue() 

나는 각 ThreadPoolExecutor를 작성하고 매개 변수로이 대기열을 전달합니다.

class MyExecutor(concurrent.futures.ThreadPoolExecutor): 
    def __init__(self, error_queue, cancel_queue): 
     self.error_queue : error_queue 
     self.cancel_queue = cancel_queue 

ThreadPoolExecutor에는 메인 루프가 있습니다. 이 루프에서는 먼저 "취소"메시지를 사용할 수 있는지 확인하기 위해 cancel_queue을 검사합니다.

메인 루프에서는 예외 관리자도 구현합니다. 주요 과정에서

self.status = "running" 
with True: # <- or something else 
    if not self.cancel_queue.empty(): 
     self.status = "cancelled" 
     break 
    try: 
     # normal processing 
     ... 
    except Exception as exc: 
     # you can log the exception here for debug 
     self.error_queue.put(exc) 
     self.status = "error" 
     break 
    time.sleep(.1) 

:

실행 모든 MyExecutor 예를 erreur가 발생하면, 나는 예외를 발생.

가 error_queue 검사 : 모든

while True: 
    if not error_queue.empty(): 
     cancel_queue.put("cancel") 
    time.sleep(.1) 
관련 문제