2013-08-24 4 views
18

여기에 프로그램의 이걸 없애 버려? 2.7로 maxtasksperchild를 시도했지만 도움이되지 않았습니다.메모리 사용량이 파이썬의 multiprocessing.pool로 성장을 계속

나는 다른 모든 요인을 피하기 위해 이미 6G + RES를 얻었으며 ~ 1.5M 지점에서 apply_async() ~ 6M 번을 호출하는 더 복잡한 프로그램을 가지고 있으며 위의 버전으로 프로그램을 단순화했습니다.

편집 : 모든 사람의 입력

,이 버전이 더 잘 작동 밖으로 켜지고 감사 : 나는 주요 프로세스가 단일 스레드입니다 생각으로 내가 거기에있는 잠금 장치를 두지 않았다

#!/usr/bin/python 

import multiprocessing 

ready_list = [] 
def dummy_func(index): 
    global ready_list 
    ready_list.append(index) 

def worker(index): 
    return index 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    result = {} 
    for index in range(0,1000000): 
     result[index] = (pool.apply_async(worker, (index,), callback=dummy_func)) 
     for ready in ready_list: 
      result[ready].wait() 
      del result[ready] 
     ready_list = [] 

    # clean up 
    pool.close() 
    pool.join() 

(콜백입니다 필자가 읽은 문서 당 이벤트 중심의 작업과 비슷합니다.)

v1의 색인 범위를 v2와 동일하게 변경하여 몇 가지 테스트를 수행했습니다. v2가 v1 (33s 대 37s)보다 10 % 더 빠르다고해도 이상한 일입니다. v1이 너무 많은 내부 목록 유지 관리 작업을 수행하고있을 수도 있습니다. v2는 300M (VIRT)과 50M (RES)을 넘지 못했으며, v1은 370M/120M이었고, 최고는 330M/85M이었습니다. 모든 수치는 단지 3 ~ 4 배의 테스트로 참조 용이었습니다. map_async 대신 apply_async

+1

그냥 여기에 추측하지만 백만 개의 개체를 큐에 넣으면 공간이 필요합니다. 아마도 일괄 처리하는 것이 도움이 될 것입니다. 문서는 명확하지 않지만 [예제] (http://pydoc.net/Python/multiprocessing/2.6.2.1/multiprocessing.examples.mp_pool/) (Testing 콜백 검색)에서는 apply_async 결과가 대기중인 것을 알 수 있습니다. 콜백이 있습니다. 결과 대기열을 지우려면 대기가 필요할 수 있습니다. – tdelaney

+0

그래서 callback은 실제로 정리 작업을 수행하지 않으므로 multiprocessing.pool이 올바른 도구가 아닐 수 있습니다. 콜백에서 정리가 가능합니까? 문제는 실제 작업자()가 요청 당 0.1 초 (여러 HTTP 요청)가 걸리므로 apply_async() 호출을 기다릴 수 없다는 것입니다. –

+1

야생 추측 :'apply_asynch'는 ['AsynchResult'] (http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult) 인스턴스를 만듭니다. 'Pool'은 계산이 끝났을 때 결과를 반환 할 수 있어야하기 때문에 이러한 객체에 대한 참조를 가지고있을 것입니다. 그러나 루프에서 단순히 던져 버릴뿐입니다. 아마도 어쩌면'apply_asynch'의'callback' 인자를 사용하여 어떤 시점에서 비동기 결과에 대해'get()'또는'wait()'를 호출해야 할 것입니다. – Bakuriu

답변

6

를 사용하여 과도한 메모리 사용을 방지 할 수 있습니다.

귀하의 첫 번째 예를 들어, 다음 두 줄을 변경 :

for index in range(0,100000): 
    pool.apply_async(worker, callback=dummy_func) 

pool.map_async(worker, range(100000), callback=dummy_func) 

에 그것은 깜짝 할 사이에 완료됩니다 당신이 top에서의 메모리 사용량을 볼 수 있습니다 전에. 그 차이를보기 위해 목록을 더 큰 것으로 변경하십시오. 그러나 map_async__len__ 메서드가없는 경우 길이를 계산하기 위해 먼저 전달한 반복 가능 문자를 목록으로 변환합니다. 엄청난 수의 요소가있는 반복자가있는 경우 itertools.islice을 사용하여 더 작은 청크로 처리 할 수 ​​있습니다.

나는 훨씬 더 많은 데이터와 실제 프로그램에서 메모리 문제가 있었다 마침내 범인이 apply_async 알게되었다.

추신, 메모리 사용과 관련하여 두 가지 예는 명백한 차이가 없습니다.

4

내가 처리하고있어 매우 큰 차원 포인트 클라우드 데이터 세트가 있습니다. 처리 속도를 높이기 위해 다중 처리 모듈을 사용해 보았지만 메모리 오류가 발생하기 시작했습니다. 몇 가지 연구와 테스트를 마친 후에는 대기열을 채우고 서브 프로세스가 비울 때보 다 훨씬 빨리 처리 할 수 ​​있다고 판단했습니다. 청킹, 또는 map_async 또는 뭔가를 사용하여 부하를 조정할 수 있지만, 주변 논리를 크게 변경하고 싶지는 않습니다.

내가 맞힌 멍청한 해결책은 간헐적으로 pool._cache 길이를 확인하고 캐시가 너무 큰 경우 대기열이 비워질 때까지 기다리는 것입니다.메모리 1G가에서 사용에 대해 1 개 백만 작업 (대기보다 더가있는 경우

# Update status 
count += 1 
if count%10000 == 0: 
    sys.stdout.write('.') 
    if len(pool._cache) > 1e6: 
     print "waiting for cache to clear..." 
     last.wait() # Where last is assigned the latest ApplyResult 

그래서 수영장에 모든 10,000 삽입 내가 확인 : 내 돌이에서

나는 이미 카운터와 상태 시세가 있었다 메인 프로세스). 대기열이 가득 차면 마지막으로 삽입 된 작업이 완료 될 때까지 대기합니다.

내 프로그램이 메모리가 부족한 상태에서 몇 시간 동안 실행될 수 있습니다. 주 프로세스는 작업자가 계속 데이터를 처리하는 동안 때때로 일시 중지됩니다.

은 BTW _cache 멤버는 멀티 프로세싱 모듈 풀 예제를 설명되어 있습니다

나는 그것이 산란 프로세스를 유지 있도록 멀티 기능 여러 번 사용하고 그들을 떠난 이후 나는 최근에 메모리 문제가 있었다
# 
# Check there are no outstanding tasks 
# 

assert not pool._cache, 'cache = %r' % pool._cache 
15

기억.

여기에 내가 지금 사용하고 솔루션의 :

def myParallelProcess(ahugearray) 
from multiprocessing import Pool 
from contextlib import closing 
with closing(Pool(15)) as p: 
    res = p.imap_unordered(simple_matching, ahugearray, 100) 
return res 

내가

+2

이 문제를 해결 한 후 내 문제가 해결되었습니다! 고마워요! 루프 내부에 풀을 생성하여 너무 많은 프로세스를 생성 시켰습니다. 각 프로세스는 너무 많은 메모리를 소비하고 결코 종료하지 않았습니다. 방금 루프의 끝에서 mypool.close()를 할 필요가있었습니다. – MohamedEzz

1

와 나는이 the question I posted 유사하다 생각하지만, 난 당신이 같은 지연이 확실하지 않다. 내 문제는 내가 멀티 프로세스 풀에서 결과를 생성하는 속도가 내가 소비 한 것보다 빠르기 때문에 메모리에 축적 된 결과였다. 이를 방지하기 위해 입력 값을 풀로 조절하기 위해 semaphore을 사용 했으므로 소모하는 출력보다 너무 많이 앞서 가지 않았습니다.

0

루프 내에서 풀을 만들고 루프 끝에서 닫으십시오 ( pool.close()).

관련 문제