2014-10-09 1 views
0

동일한 반복 동안 작업자 풀에 인수를 동적으로 추가하는 방법을 찾고 있습니다. 그래서, 이들 중 일부가 실패 할 경우 즉시 재 처리 할 수 ​​있습니다.동적으로 인수를 작업자 풀에 추가하십시오.

from numpy import random 
from multiprocessing import Pool 
from time import sleep 

def foo(x): 
    sleep(0.1) 
    # 50% chance to have a fault 
    return x, x if random.rand() > 0.5 else -1 

random.seed(3)  # seed 
pool = Pool(2)  # process 
args = range(5)  # arguments to process 

for i,(id,x) in enumerate(pool.imap(foo, args)): 
    print i,x 
    if x != -1: 
     args.remove(id) 

print args 

출력은

0 0 
1 1 
2 2 
3 3 
4 -1 
[4] 

하지만 나는 그것이 같은 반복 내에서

0 0 
1 1 
2 2 
3 3 
4 -1 
5, 4 
[] 

싶습니다. 다시 말하면 반복이 완료되면 동일한 풀의 직원에게 새로운지도를 작성하고 싶지 않습니다. 새로운 인수를 직접 푸시하고 싶습니다. 따라서 첫 번째 반복에서 실패하므로 사용 가능한 프로세스를 사용하기 전에 끝까지 기다릴 필요가 없습니다! 간단하게 위의 내 문제는 "foo는"함수를 완료하는 데 약 20 분 정도 소요되며이 동시에 실행 24 개 과정에 걸쳐 퍼져 : 나는

업데이트 ... 그것은 이해 바랍니다. 한 프로세스가 실패하면 가능한 한 빨리 재 처리해야합니다. 사용 가능한 리소스가있을 때 20 분을 기다리지 않으려 고합니다.

+0

에 날아가 값 : '<>'연산자 * * 사용할 수 없습니다. 그것은 파이썬 2.0의 * deprecated * 이후 python2의 사마귀이고 * python3 +에서 제거되었습니다 *. 이 코드를 사용하면 * 1) 아무도이 코드를 사용하지 않으므로 코드를 다른 사람이 읽을 수 없게 만든다. 2) 코드의 이식성을 떨어 뜨린다. 나는 보통'! ='대신에 그것을 사용할 이유가 없다. – Bakuriu

+0

변경됨 - 나도 싫어 해요! –

+0

내가 아는 한, 실행중인 풀에 새로운 작업을 추가 할 수 없습니다. 질문에서 정확히 무엇을하려하는지 알 수 없습니다. 프로세스가 어떤 인수로 실패하면, 해당 인수를 재 시도하거나 실패를 기록한 다음 다음 단계로 넘어 가고 싶습니까? 의도 한 결과에 따라 동일한 인수를 사용하여 다시 시도하려는 것으로 추측되지만 설명이 도움이 될 것입니다. –

답변

1

내가 아는 한, 현재 실행중인 Pool에 작업을 추가 할 수 없습니다 (경쟁 조건이나 정의되지 않은 동작을 만들지 않고 현재보고있는 것처럼). 다행히 성공한 작업이 완료 될 때까지 실패한 작업을 다시 시도해야하기 때문에 실제로는 이 에 무엇이든 추가하려면이 필요하지 않습니다. 매핑 된 함수을 원하는 방식으로 수정하면됩니다.

def foo(x): 
    sleep(0.1) 
    # 50% chance to have a fault 
    return x, x if random.rand() > 0.5 else -1 

def successful_foo(x): 
    '''Version of the foo(x) function that never fails.''' 

    result = -1 
    while result == -1: 
     result = foo(x) 
    return result 

이제 pool.imap(successful_foo, args) 수 있으며, 모든 프로세스가 성공적으로 완료 (또는 영원히 실행)되므로 안심. 그것이 영원히 달릴 수 있고 몇몇 시도 또는 어떤 클립의 양 후에 중단하는 선택권을 원하는 경우에, 다만 적합한 카운터 또는 타이머로 while 루프를 교환하십시오.


물론 많은 데모가 아닌 경우에는 실패를 나타내는 특별한 반환 값을 갖는 것이 비현실적입니다. 그 상황에서, 나는 당신이 발생할 수있는 예측 오류의 종류를 처리하기 위해 전문 Exception를 사용하는 것을 선호 :

class FooError(BaseException): 
    pass 

def foo(x): 
    sleep(0.1) 
    # 50% chance to have a fault 
    if random.rand() > 0.5: # fault condition 
     raise FooError('foo had an error!') 
    return x, x 

def successful_foo(x): 
    '''Version of the foo(x) function that never fails.''' 

    while True: 
     try: 
      return foo(x) 
     except FooError as e: 
      pass # Log appropriately here; etc. 
+0

쉬운 - 많은 감사합니다! –

0

수 없습니다. 반복하는 동안 변경 가능한 목록을 수정하려는 경우 이 작동하지 않음을 알고 있습니다.. 출력은 remove 항목이 목록을 구성 할 때 목록의 길이가 1으로 줄어들고 제거한 항목 이후의 모든 항목을 하나의 색인으로 이동하기 때문입니다. 즉, 뒤에 오는 항목은으로 건너 뜁니다.

문제는 multiprocessing 자체와는 아무 상관이없는,하지만 일반 목록과 : 루프가 짝수 값 이상 을 반복 결코 이상한 값을 본

In [1]: def f(x): 
    ...:  print(x) 
    ...:  

In [2]: args = [0, 1, 2, 3, 4, 5] 

In [3]: for i, x in enumerate(args): 
    ...:  print(i, x) 
    ...:  if x % 2 == 0: 
    ...:   args.remove(x) 
    ...:   
0 0 
1 2 
2 4 

In [4]: args 
Out[4]: [1, 3, 5] 

참고.

이 제거되는 항목을 추적 할 만 루프의 끝에서 그렇게 : 아마

to_be_removed = [] 
for i, (ident, x) in enumerate(pool.imap(foo, args)): 
    print(i, x) 
    if x != -1: 
     to_be_removed.append(ident) 

for ident in to_be_removed: 
    args.remove(ident) 

또는 더 효율적으로, 당신은 set을 사용할 수 args 목록을 재 구축 :

to_be_removed = set() 
for i, (ident, x) in enumerate(pool.imap(foo, args)): 
    print(i, x) 
    if x != -1: 
     to_be_removed.add(ident) 

args = [el for el in args if el not in to_be_removed] 

이것은 이전 솔루션의 2 차 시간 대신 선형 시간이 걸립니다.


당신은 내가이하는 생각 때문에 그러나 나는이 정말 multiprocessing와 함께 일하는 것이 확실하지 않다, 모든 반복에 대해 어떤 요소를 생산하기로 임의의 복잡한 의사 결정을 할 수있는 사용자 정의 반복자을 만들도 만들 수 있습니다

하나씩 항목을 소비하지 않습니다 (그렇지 않으면 병렬 처리 할 수 ​​없음). 따라서 예상 한대로 수정 사항이 실제로 표시 될 것이라는 어떠한 보증도 제공 할 수 없습니다.

또한 버그는 버그를 묻는 것입니다.

+0

"args.remove (id)"대신 "args.append (id)"를 시도했는데, 이는 내가 목록에 무엇인가를 추가 할 때마다 하나씩 반복을 증가시켜야한다는 것을 의미합니다. 그것도 작동하지 않습니다. 위의 내 문제는 실제로 지나치게 단순화됩니다. "foo"기능은 완료하는 데 약 20 분이 걸리고 동시에 실행 된 24 개의 프로세스에 분산되어 있으며 한 프로세스가 실패하면 곧 가능한 한 빨리 다시 처리해야합니다. –

+0

@AlessandroMariani 나는 ** args.append를 사용하여 iteration을 하나씩 증가시켜야한다고 말하지 않았다. 내가 말하고자하는 것은 반복하는 동안 목록의 크기를 수정하면 * 정의되지 않은 동작 *이 발생한다는 것입니다. 사실, 당신이'dict'에 그렇게하려고한다면, 당신은 자동적으로 요소들을 건너 뛰는 대신에 예외를 얻을 것이다. 목록 반복에 의존 할 수는 없습니다. 보증을 원한다면 완벽하게 사용자 정의 된 반복자를 사용하여 직접 빌드해야합니다. – Bakuriu

+0

@AlessandroMariani 또한 'Pool'의 경우 풀은 한 번에 하나의 요소를 반복하지 않습니다. * k * 작업 청크로 목록을 나눠서 다른 프로세스로 보냅니다. 바꾸어 말하면, 당신이하는 수정은 노동자들과 경쟁 조건이 있습니다. 이 작업을 의도대로 작동하게하려면 프로세스 잠금을 사용해야합니다. 그러면 병렬 처리가 줄어들고 다중 처리의 목적이 무효화됩니다. 어쨌든 내 대답이 당신의 원래 질문에 대답했다고 믿습니다. 풀을 사용하여 복잡한 작업을 파견하는 것에 대한 새로운 질문이 생기면 새 풀을 열어야합니다. – Bakuriu

관련 문제