2016-09-21 1 views
8

MxN 배열을 렌더링하는 함수가 정의되어 있습니다. 배열은 매우 거대하므로 나는 다중 처리/스레딩을 사용하여 동시에 작은 배열 (M1xN, M2xN, M3xN --- MixN.M1 + M2 + M3 + --- Mi = M)을 생성하는 함수를 사용하고자합니다. 이 배열을 결합하여 mxn 배열을 만듭니다. 씨 Boardrider가 정당하게 실행 가능한 예제를 제공하기 위해 제안으로, 예를 들어 다음은 광범위하게 내가 더 많은 시간이 걸릴 것입니다 xy 증가 시스템의 길이로멀티 프로세싱/스레딩을 사용하여 numpy 배열 연산을 청크로 나누기

import numpy as n 
def mult(y,x): 
    r = n.empty([len(y),len(x)]) 
    for i in range(len(r)): 
     r[i] = y[i]*x 
    return r 
x = n.random.rand(10000) 
y = n.arange(0,100000,1) 
test = mult(y=y,x=x) 

을 할 의도 무엇을 전달한다. 이 예제와 관련하여이 코드를 실행하여 코어가 4 개인 경우 각 작업의 1/4을 수행 할 수 있습니다. 즉, 요소를 계산하기 위해 작업을 수행합니다. r[0] ~ r[24999] ~ 1 코어 r[25000] ~ r[49999] 제 3 코어에 대해서는 r[50000] ~ r[74999], 제 4 코어에 대해서는 r[75000] ~ r[99999]이된다. 결국 클럽 결과를 추가하여 하나의 배열 r[0]r[99999]에 추가하십시오.

이 예제가 명확하게 보이기를 바랍니다. 내 문제가 여전히 명확하지 않으면 알려주십시오. 말을

+1

[mcve]는 어떻습니까? – boardrider

+0

멀티 스레드/프로세스 인 경우에도 numpy의 내부 브로드 캐스팅 메커니즘보다 더 빨리 파이썬에서 뭔가를 프로그래밍하지 않을 것입니다 ... numpy가 내부적으로 수행하도록하십시오 – Aaron

+0

다중 스레드/프로세스를 사용하지 않도록 조심하십시오 . 엄청난 양의 데이터에 대해 적은 양의 작업을 수행하면 CPU가 메모리 버스 속도 (CPU의 캐시 등과 비교하면 느리다)에 의해 방해 받게됩니다. 따라서 알고리즘이 I/O 바운드이면 더 많은 스레드를 추가해도 속도가 향상되지 않습니다. – bazza

답변

6

우선이다 : 그것은 동일한 프로세서에 여러 개의 코어에 대해 있다면, numpy이에서

(multiplication of large arrays in python의 설명을 참조) 이미 작업을 병렬 우리가 이제까지 손으로 할 수있는 것보다 더 잘 할 수있다 당신이 실제로 분리 된 여러 개의 CPU를 통해이 확산하고 싶었

test2 = x[n.newaxis, :] * y[:, n.newaxis] 

n.abs(test - test2).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations 

[, 즉 다른입니다 : 경우 키는 단순히 곱셈이 모든 오히려 파이썬 for -loop보다 도매 배열 작업에서 수행되도록하는 것 문제는 있지만 질문은 단일 (멀티 코어) CPU를 제안]


확인을 염두에 위의 베어링 :.의 방금 mult()보다 작업이 더 복잡 병렬 처리한다고 가정하자. numpy이 병렬 처리 할 수있는 도매 배열 연산으로 연산을 최적화하려고했지만 연산이 문제가되지 않는다고 가정 해 봅시다. 이 경우 lock=Falsemultiprocessing.Pool으로 생성 된 공유 메모리 multiprocessing.Array을 사용하여 겹치지 않는 청크를 처리하고 y 차원 (그리고 원하는 경우 x 이상)으로 나뉘어 진 프로세스를 할당 할 수 있습니다. 예제 목록이 아래에 나와 있습니다. 이 접근법은 명시 적으로 명시 적으로 정확하게 수행하지는 않습니다 (결과를 정리하고 단일 배열에 추가). 대신 여러 프로세스가 공유 메모리의 중첩되지 않는 부분에서 응답의 해당 부분을 어셈블합니다. 일단 완료되면 콜레 션/추가가 필요하지 않습니다. 결과를 읽습니다.

import os, numpy, multiprocessing, itertools 

SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/ 

def operate(slices): 
    # grok the inputs 
    yslice, xslice = slices 
    y, x, r = get_shared_arrays('y', 'x', 'r') 
    # create views of the appropriate chunks/slices of the arrays: 
    y = y[yslice] 
    x = x[xslice] 
    r = r[yslice, xslice] 
    # do the actual business 
    for i in range(len(r)): 
     r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself. 
         # But let's assume this is a placeholder for something more complicated. 

    return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size) 

def check(y, x, r): 
    r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary) 
    print('max. abs. diff. = %g' % numpy.abs(r - r2).max()) 
    return y, x, r 

def slicestr(s): 
    return ':'.join('' if x is None else str(x) for x in [s.start, s.stop, s.step]) 

def m2n(buf, shape, typecode, ismatrix=False): 
    """ 
    Return a numpy.array VIEW of a multiprocessing.Array given a 
    handle to the array, the shape, the data typecode, and a boolean 
    flag indicating whether the result should be cast as a matrix. 
    """ 
    a = numpy.frombuffer(buf, dtype=typecode).reshape(shape) 
    if ismatrix: a = numpy.asmatrix(a) 
    return a 

def n2m(a): 
    """ 
    Return a multiprocessing.Array COPY of a numpy.array, together 
    with shape, typecode and matrix flag. 
    """ 
    if not isinstance(a, numpy.ndarray): a = numpy.array(a) 
    return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix) 

def new_shared_array(shape, typecode='d', ismatrix=False): 
    """ 
    Allocate a new shared array and return all the details required 
    to reinterpret it as a numpy array or matrix (same order of 
    output arguments as n2m) 
    """ 
    typecode = numpy.dtype(typecode).char 
    return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix 

def get_shared_arrays(*names): 
    return [m2n(*SHARED_VARS[name]) for name in names] 

def init(*pargs, **kwargs): 
    SHARED_VARS.update(pargs, **kwargs) 

if __name__ == '__main__': 

    ylen = 1000 
    xlen = 2000 

    init(y=n2m(range(ylen))) 
    init(x=n2m(numpy.random.rand(xlen))) 
    init(r=new_shared_array([ylen, xlen], float)) 

    print('Master process ID is %s' % os.getpid()) 

    #print(operate([slice(None), slice(None)])); check(*get_shared_arrays('y', 'x', 'r')) # local test 

    pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items()) 
    yslices = [slice(0,333), slice(333,666), slice(666,None)] 
    xslices = [slice(0,1000), slice(1000,None)] 
    #xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension 
    reports = pool.map(operate, itertools.product(yslices, xslices)) 
    print('\n'.join(reports)) 
    y, x, r = check(*get_shared_arrays('y', 'x', 'r'))