2011-09-05 4 views
11

나는 256x256x256 Numpy 배열을 가지고 있는데, 각 요소는 매트릭스이다. 이 행렬 각각에 대해 몇 가지 계산을 수행해야하며, 속도를 높이기 위해 multiprocessing 모듈을 사용하고 싶습니다. 원래의 배열 요소 [i,j,k]에서 매트릭스의 결과가 새로운 배열의 요소 [i,j,k] 넣어해야 때문에 이러한 계산itertools와 다중 처리를 결합 하시겠습니까?

결과는 원래 같은 256x256x256 어레이에 저장되어야한다.

이렇게하려면 pseudo-ish 방식으로 [array[i,j,k], (i, j, k)]으로 작성하여 "다중 처리"되는 함수에 전달할 수있는 목록을 만들고 싶습니다. matrices은 원래 배열과 myfunc에서 추출 된 모든 행렬의 목록은 계산을 수행하는 기능이다라고 가정 코드는 다음과 같이 다소 보일 것이다

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

그러나 map_async 실제로이 만드는 것 같아 거대한 finput -list first : CPU 사용량은 많지 않지만 메모리와 스왑은 몇 초 만에 완전히 소모됩니다. 이는 분명 내가 원하는 것이 아닙니다.

명시 적으로 먼저 만들 필요없이이 거대한 목록을 다중 처리 기능에 전달할 수있는 방법이 있습니까? 또는이 문제를 해결하는 다른 방법을 알고 있습니까?

감사합니다. :-)

+1

'map_async()'에서'get()'을 사용하고 있기 때문에 * 비동기 * 연산을 원하지 않으므로 대신'Pool.map()'을 사용해야합니다. –

+0

아마도이 문제를 제대로 이해하지 못했지만 imap 또는 imap_unordered를 고려 했습니까? –

답변

10

모든 multiprocessing.Pool.map* 메서드는 함수가 호출 되 자마자 완전히 반복기를 사용합니다. (demo code). 한 번에 반복자 한 덩어리의지도 기능의 덩어리를 공급하려면 grouper_nofill를 사용

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

PS합니다. pool.map_asyncchunksize 매개 변수는 다른 것을 수행합니다. iterable을 청크로 분할 한 다음 각 청크를 map(func,chunk)을 호출하는 작업자 프로세스에 전달합니다. 이렇게하면 func(item)이 너무 빨리 끝나면 작업자가 더 많은 데이터를 처리 할 수 ​​있지만 map_async 호출이 발행 된 직후 반복기가 여전히 완전히 소모되기 때문에 상황에 도움이되지 않습니다.

+0

대단히 감사합니다! 귀하의 솔루션이 실제로 작동하는 것 같습니다! 참고로 pool.map_async (myfunc, finput) .get (999999)을 사용해야했지만 작동했습니다! 그러나, 그것은 여전히 ​​정확한 chunkksize에 의존하는 많은 메모리를 사용하며, 파이썬은 실행 중에 가비지 수집하지 않는 것 같습니다. 그게 왜 어떤 아이디어일까요? – digitaldingo

+0

@digitaldingo : 흠, 아무것도 생각 나지 않습니다. 코드를 [SSCCE] (http://sscce.org/)로 옮겨서 여기에 게시 할 수 있다면 이상적입니다. – unutbu

0

Pool.map_async()은 작업을 여러 작업자로 파견하는 데 필요한 반복 가능 시간을 알아야합니다. izip에는 __len__이 없으므로 iterable을 먼저 목록으로 변환하여 사용중인 거대한 메모리 사용을 유발합니다.

__len__으로 자신의 izip- 스타일 반복기를 만들어이 문제를 회피하려고 할 수 있습니다.

+0

왜 그 사실을 알아야합니까?왜 모든 유휴 노동자와 대기자에게 먹이를 줄 수 없습니까? –

+0

@andrew -'map_async()'('multiprocessing/pool.py')의 첫번째 줄은 실제로 hasattr (iterable, '__len__')이 아니라면 : iterable = list (iterable)'입니다. 작업자의 완료 순서를 알 수 없으므로 충분히 큰 출력 목록을 작성하려면 길이를 알아야합니다. –

+0

흠. 동적으로 구성 할 수 있습니까? 나는 이것이 문제로 제기되었을지도 모른다고 생각하고 있습니다. 그것은 유효한 요청처럼 보입니다. –

2

이 문제도 발생했습니다. 이 대신에 :

res = p.map(func, combinations(arr, select_n)) 

그것을 소비하지 않는

res = p.imap(func, combinations(arr, select_n)) 

IMAP을!

관련 문제