1

목록이있는 프로그램이 있습니다. 이 목록의 각 값에 대해 다른 목록을 검색하고이 다른 목록을 처리합니다.Future 내에서 ProcessPoolExecutor를 사용할 수 있습니까?

기본적으로 각 노드에서 값 비싼 처리를 수행해야하는 3 깊이 트리입니다.

각 노드는 하위 노드의 결과를 처리 할 수 ​​있어야합니다.

내가 할 수있는 일은 list의 첫 번째 레이어 입력에서 각 노드의 결과로 map입니다. 이 각각의 과정에서, 나는 다음 층의 결과가 map이 되길 원합니다.

내가 걱정하는 것은 각 레이어마다 고유 한 최대 근로자 수가 있다는 것입니다. 가능한 경우 프로세스 풀을 공유하고 싶습니다. 그렇지 않으면 모든 프로세스 전환에 대한 성능 적중률이 있습니다.

concurrency.futures 또는 다른 방법을 사용하여 각 레이어가 동일한 프로세스 풀을 공유하도록 할 방법이 있습니까?

예제는 다음과 같습니다 이런 식으로

def main(): 
    my_list = [1,2,3,4] 
    with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor: 
     results = executor.map(my_function, zip(my_list, [executor] * len(my_list))) 
     #process results 

def my_function(args): 
    list = args[0] 
    executor = args[1] 
    new_list = process(list) 
    results = executor.map(second_function, new_list) 
    #process results 
    #return processed results 

def second_function(values): 
    ... 

는, 각각의 자식 프로세스는 동일한 풀에서 그릴 것입니다.

또는, 나는 (그러나 정확히)

import concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor

같은 일을하고 같은 프로세스 풀에서 executor 풀 각 호출을 할 수 있습니까?

답변

1

문제는 프로세스 풀에 4 개의 스레드가 있으며 20 개의 스레드에서 기다려야한다는 것입니다. 그래서 원하는 스레드를 수행 할 충분한 스레드가 없도록합니다.

즉, my_function은 작업자 스레드에서 실행됩니다. 이 스레드는 map이 호출 될 때 차단합니다. 지도에 대한 호출을 실행하기에 덜 유용한 스레드가 하나 있습니다. 선물은이 스레드를 차단합니다.

내 솔루션은 선물을 반환하는 yieldyield from 문을 사용하는 것입니다. 그래서 내 솔루션은 선물과 스레드의 차단을 제거하는 것입니다. 모든 미래가 만들어지고 그 다음에 실행이 진행되어 스레드가 해제됩니다. 그런 다음이 스레드는 맵 미래를 실행할 수 있습니다. 앞으로 등록 콜백은 next() 생성 단계를 실행합니다.

이 질문이 먼저 해결되어야하는 객체를 exissting에 프록시 문제를 해결하려면 How to properly set up multiprocessing proxy objects for objects that already exist

그래서 우리는 실행하기 위해 다음과 같은 재귀가 주어진 : [1,[2,[3,3,3],2],1],0,0]리스트의 재귀 병렬 합.업데이트 된 버전은 여기에서 찾을 수 있습니다

import traceback 
from concurrent.futures.thread import * 
from concurrent.futures import * 
from concurrent.futures._base import * 
##import hanging_threads 

class RecursiveThreadPoolExecutor(ThreadPoolExecutor): 

    # updated version here: https://gist.github.com/niccokunzmann/9170072 

    def _submit(self, fn, *args, **kwargs): 
     return ThreadPoolExecutor.submit(self, fn, *args, **kwargs) 

    def submit(self, fn, *args, **kwargs): 
     """Submits a callable to be executed with the given arguments. 

     Schedules the callable to be executed as fn(*args, **kwargs) and returns 
     a Future instance representing the execution of the callable. 

     Returns: 
      A Future representing the given call. 
     """ 
     real_future = Future() 
     def generator_start(): 
      try: 
##    print('start', fn, args, kwargs) 
       generator = fn(*args, **kwargs) 
##    print('generator:', generator) 
       def generator_next(): 
        try: 
##      print('next') 
         try: 
          future = next(generator) 
         except StopIteration as stop: 
          real_future.set_result(stop.args[0]) 
         else: 
          if future is None: 
           self._submit(generator_next) 
          else: 
           future.add_done_callback(lambda future: generator_next()) 
        except: 
         traceback.print_exc() 
       self._submit(generator_next) 
##    print('next submitted 1') 
      except: 
       traceback.print_exc() 
     self._submit(generator_start) 
     return real_future 

    def recursive_map(self, fn, *iterables, timeout=None): 
     """Returns a iterator equivalent to map(fn, iter). 

     Args: 
      fn: A callable that will take as many arguments as there are 
       passed iterables. 
      timeout: The maximum number of seconds to wait. If None, then there 
       is no limit on the wait time. 

     Returns: 
      An iterator equivalent to: map(func, *iterables) but the calls may 
      be evaluated out-of-order. 

     Raises: 
      TimeoutError: If the entire result iterator could not be generated 
       before the given timeout. 
      Exception: If fn(*args) raises for any values. 
     """ 
     if timeout is not None: 
      end_time = timeout + time.time() 

     fs = [self.submit(fn, *args) for args in zip(*iterables)] 

     # Yield must be hidden in closure so that the futures are submitted 
     # before the first iterator value is required. 
     def result_iterator(): 
      yield from fs 
      return fs 
     return result_iterator() 

if __name__ == '__main__': 

    def f(args): 
     executor, tasks = args 
     print ('tasks:', tasks) 
     if type(tasks) == int: 
      return tasks 
     # waiting for all futures without blocking the thread 
     futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks]) 
     return sum([future.result() for future in futures]) 

    with RecursiveThreadPoolExecutor(max_workers = 1) as executor: 
     r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1) 
     import time 
     time.sleep(0.1) 

     for v in r: 
      print('v: {}'.format(v)) 

:

tasks: [[1, [2, [3, 3, 3], 2], 1], 0, 0] 
tasks: [1, [2, [3, 3, 3], 2], 1] 
tasks: 0 
tasks: 0 
tasks: 1 
tasks: [2, [3, 3, 3], 2] 
tasks: 1 
tasks: 2 
tasks: [3, 3, 3] 
tasks: 2 
tasks: 3 
tasks: 3 
tasks: 3 
v: 15 

이 코드는 여기에 재귀가있는 ThreadPoolExecutor 활성화 소개 :

우리는 다음과 같은 출력을 기대할 수, https://gist.github.com/niccokunzmann/9170072

슬프게도을 I 이제는 다중 처리 작업을 사용하여 프로세스에 대해이를 구현할 수 없습니다. 당신은 그것을 할 수 있고 필요한 것은 오직 generator_startgenerator_next 함수에 프록시 객체를 만드는 것입니다. 그렇게하는 경우 알려 주시기 바랍니다.

이 문제에 대한 프록시 문제를 해결하려면이 질문에 답변해야합니다. How to properly set up multiprocessing proxy objects for objects that already exist

관련 문제