문제는 프로세스 풀에 4 개의 스레드가 있으며 20 개의 스레드에서 기다려야한다는 것입니다. 그래서 원하는 스레드를 수행 할 충분한 스레드가 없도록합니다.
즉, my_function
은 작업자 스레드에서 실행됩니다. 이 스레드는 map이 호출 될 때 차단합니다. 지도에 대한 호출을 실행하기에 덜 유용한 스레드가 하나 있습니다. 선물은이 스레드를 차단합니다.
내 솔루션은 선물을 반환하는 yield
및 yield from
문을 사용하는 것입니다. 그래서 내 솔루션은 선물과 스레드의 차단을 제거하는 것입니다. 모든 미래가 만들어지고 그 다음에 실행이 진행되어 스레드가 해제됩니다. 그런 다음이 스레드는 맵 미래를 실행할 수 있습니다. 앞으로 등록 콜백은 next()
생성 단계를 실행합니다.
이 질문이 먼저 해결되어야하는 객체를 exissting에 프록시 문제를 해결하려면
How to properly set up multiprocessing proxy objects for objects that already exist
그래서 우리는 실행하기 위해 다음과 같은 재귀가 주어진 : [1,[2,[3,3,3],2],1],0,0]
리스트의 재귀 병렬 합.업데이트 된 버전은 여기에서 찾을 수 있습니다
import traceback
from concurrent.futures.thread import *
from concurrent.futures import *
from concurrent.futures._base import *
##import hanging_threads
class RecursiveThreadPoolExecutor(ThreadPoolExecutor):
# updated version here: https://gist.github.com/niccokunzmann/9170072
def _submit(self, fn, *args, **kwargs):
return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
real_future = Future()
def generator_start():
try:
## print('start', fn, args, kwargs)
generator = fn(*args, **kwargs)
## print('generator:', generator)
def generator_next():
try:
## print('next')
try:
future = next(generator)
except StopIteration as stop:
real_future.set_result(stop.args[0])
else:
if future is None:
self._submit(generator_next)
else:
future.add_done_callback(lambda future: generator_next())
except:
traceback.print_exc()
self._submit(generator_next)
## print('next submitted 1')
except:
traceback.print_exc()
self._submit(generator_start)
return real_future
def recursive_map(self, fn, *iterables, timeout=None):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
yield from fs
return fs
return result_iterator()
if __name__ == '__main__':
def f(args):
executor, tasks = args
print ('tasks:', tasks)
if type(tasks) == int:
return tasks
# waiting for all futures without blocking the thread
futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks])
return sum([future.result() for future in futures])
with RecursiveThreadPoolExecutor(max_workers = 1) as executor:
r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1)
import time
time.sleep(0.1)
for v in r:
print('v: {}'.format(v))
:
이
tasks: [[1, [2, [3, 3, 3], 2], 1], 0, 0]
tasks: [1, [2, [3, 3, 3], 2], 1]
tasks: 0
tasks: 0
tasks: 1
tasks: [2, [3, 3, 3], 2]
tasks: 1
tasks: 2
tasks: [3, 3, 3]
tasks: 2
tasks: 3
tasks: 3
tasks: 3
v: 15
이 코드는 여기에 재귀가있는 ThreadPoolExecutor 활성화 소개 :
우리는 다음과 같은 출력을 기대할 수, https://gist.github.com/niccokunzmann/9170072
슬프게도을 I 이제는 다중 처리 작업을 사용하여 프로세스에 대해이를 구현할 수 없습니다. 당신은 그것을 할 수 있고 필요한 것은 오직 generator_start
과 generator_next
함수에 프록시 객체를 만드는 것입니다. 그렇게하는 경우 알려 주시기 바랍니다.
이 문제에 대한 프록시 문제를 해결하려면이 질문에 답변해야합니다. How to properly set up multiprocessing proxy objects for objects that already exist