2013-03-07 6 views
4

코드에 성능 문제가 있습니다. 단계 # IIII 시간을 소모합니다. 전에 itertools.prodct을 구체화하는 데 사용했지만 사용자 덕분에 더 이상 pro_data = product(array_b,array_a)을 수행하지 않습니다. 이것은 메모리 문제를 해결하는데 도움이되었지만 여전히 많은 시간을 필요로합니다. 멀티 스레딩 또는 멀티 프로세싱으로 병렬화하고 싶습니다. 무엇이든 제안 할 수있어서 감사합니다.반복을 통한 루프 평행화

설명. 입자의 x와 y 값을 포함하는 두 개의 배열이 있습니다. 각 입자 (두 좌표로 정의)에 대해 다른 함수로 함수를 계산하려고합니다. 조합을 위해 itertools.product 메서드를 사용하고 모든 입자 위로 반복합니다. 나는 총 50000 개 이상의 입자를 처리하므로 계산할 N * N/2 개의 조합이 있습니다. 사전에

감사

import numpy as np 
import matplotlib.pyplot as plt 
from itertools import product,combinations_with_replacement 

def func(ar1,ar2,ar3,ar4): #example func that takes four arguments 
    return (ar1*ar2**22+np.sin(ar3)+ar4) 

def newdist(a): 
    return func(a[0][0],a[0][1],a[1][0],a[1][1])  

x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram 

x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2) 
x_width=x_edges[1:]-x_edges[:-1] 

hist_data=np.zeros([len(x_edges)-1]) 

array1=np.random.uniform(0.,10.,100) 
array2=np.random.uniform(0.,10.,100) 

array_a = np.dstack((array1,array1))[0] 
array_b = np.dstack((array2,array2))[0] 
# IIII 
for i in product(array_a,array_b): 
    (result,bins) = np.histogram(newdist(i),bins=x_edges) 
    hist_data+=result 

hist_data = np.array(map(float, hist_data)) 
plt.bar(x_mean,hist_data,width=x_width,color='r') 
plt.show() 

----- 편집 ----- 는 지금이 코드를 사용 : 여기

def mp_dist(array_a,array_b, d, bins): #d chunks AND processes 
    def worker(array_ab, out_q): 
     """ push result in queue """ 
     outdict = {} 
     outdict = vec_chunk(array_ab, bins) 
     out_q.put(outdict) 
    out_q = mp.Queue() 
    a = np.swapaxes(array_a, 0 ,1) 
    b = np.swapaxes(array_b, 0 ,1) 
    array_size_a=len(array_a)-(len(array_a)%d) 
    array_size_b=len(array_b)-(len(array_b)%d) 
    a_chunk = array_size_a/d 
    b_chunk = array_size_b/d 
    procs = [] 
    #prepare arrays for mp 
    array_ab = np.empty((4, a_chunk, b_chunk)) 
    for j in xrange(d): 
    for k in xrange(d): 
     array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None] 
     array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)] 
     p = mp.Process(target=worker, args=(array_ab, out_q)) 
     procs.append(p) 
     p.start() 
    resultarray = np.empty(len(bins)-1) 
    for i in range(d): 
     resultarray+=out_q.get() 
    # Wait for all worker processes to finish 
    for pro in procs: 
     pro.join() 
    print resultarray 
    return resultarray 

문제는 내가 프로세스의 수를 제어 할 수 있다는 것입니다 . 대신 mp.Pool()을 사용할 수 있습니까? 보다

+0

글을 그대로 업데이트하여 구문/들여 쓰기가 정확하도록 게시물을 업데이트 할 수 있습니까? 몇 줄을 추가하여 array1 및 유사한 변수의 예제를 생성하십시오. 사람들은 당신의 질문에 답하는 데 시간을 할애 할 가능성이 훨씬 적습니다 ... – YXD

+0

나는 몇 배의 스피드 업을 위해서 [Cython] (http://wiki.cython.org/tutorials/numpy)을 살펴 보시기 바랍니다. Python과 Numpy로 병렬 처리하는 것은 그리 쉬운 일이 아닙니다. –

+0

1)'newdist1'이 대칭이면 각 쌍을 한 번만 가져옴으로써 시간을 반으로 줄일 수 있습니다. 2) 여러 프로세스에 걸쳐 작업을 분산시키기 위해'다중 처리 '(다중 처리가 아닌)를 사용할 수 있습니다. 가장 단순한 방법은 각각 프로세스가'i'의 값을 얻고 집계를 유지하는'프로세스 풀 (pool) '을 만드는 것입니다. 그물에 많은 예제가 있으므로, 그냥 사용해보십시오. 3) 그게 충분하지 않다면, 실제로, cython이 있습니다. –

답변

4

벡터화 된 numpy 연산을 사용하십시오. product()의 for-loop를 meshgrid()을 사용하여 인수를 생성하여 newdist() 호출로 바꿉니다.

meshgrid()의 서브 블록에 해당 array_a의 조각, array_b에 문제 컴퓨팅 newdist()을 parallize합니다. Here's an example using slices and multiprocessing. 여기

이 단계를 입증하는 또 다른 예이다 : 파이썬 루프 -> 벡터화 NumPy와 버전 -> 병렬 : npoints = 10_000_000에 대한

#!/usr/bin/env python 
from __future__ import division 
import math 
import multiprocessing as mp 
import numpy as np 

try: 
    from itertools import izip as zip 
except ImportError: 
    zip = zip # Python 3 

def pi_loop(x, y, npoints): 
    """Compute pi using Monte-Carlo method.""" 
    # note: the method converges to pi very slowly. 
    return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1)/npoints 

def pi_vectorized(x, y, npoints): 
    return 4 * ((x**2 + y**2) < 1).sum()/npoints # or just .mean() 

def mp_init(x_shared, y_shared): 
    global mp_x, mp_y 
    mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy 

def mp_pi(args): 
    # perform computations on slices of mp_x, mp_y 
    start, end = args 
    x = mp_x[start:end] # no copy 
    y = mp_y[start:end] 
    return ((x**2 + y**2) < 1).sum() 

def pi_parallel(x, y, npoints): 
    # compute pi using multiple processes 
    pool = mp.Pool(initializer=mp_init, initargs=[x, y]) 
    step = 100000 
    slices = ((start, start + step) for start in range(0, npoints, step)) 
    return 4 * sum(pool.imap_unordered(mp_pi, slices))/npoints 

def main(): 
    npoints = 1000000 

    # create shared arrays 
    x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)] 

    # initialize arrays 
    x, y = map(np.frombuffer, [x_sh, y_sh]) 
    x[:] = np.random.uniform(size=npoints) 
    y[:] = np.random.uniform(size=npoints) 

    for f, a, b in [(pi_loop, x, y), 
        (pi_vectorized, x, y), 
        (pi_parallel, x_sh, y_sh)]: 
     pi = f(a, b, npoints) 
     precision = int(math.floor(math.log10(npoints))/2 - 1 + 0.5) 
     print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi))) 

if __name__=="__main__": 
    main() 

시간 성능 :

pi_loop pi_vectorized pi_parallel 
    32.6   0.159  0.069 # seconds 

그것은 주요 성능 이점이 있음을 보여줍니다 파이썬 루프를 벡터화 된 numpy 아날로그로 변환하는 것.

+0

@Jaime and J.F., 고맙습니다. 그리드에서도 계산할 생각이었습니다. 그러나 서브 샘플이 아닌 배열의 모든 조합에 대해 'newdist'를 계산해야합니다. – madzone

+0

@madzone : 여러 프로세스에서 서로 다른 하위 블록을 계산합니다. 총 가능한 모든 조합이 될 것입니다. – jfs

+0

너 불행히도'meshgrid'를 계산하면 내 기억이 과부하가된다. 그래서 나는'itertools'와'multiprocessing'을 어떻게 사용할 수 있는지에 대한 나의 아이디어에서'product'를 구체화하지 않았던가? – madzone

4

먼저, 문제의 간단한 벡터화를 살펴보십시오. array_aarray_b을 입자의 좌표와 동일하게 지정하겠다고 생각합니다. 그러나 여기서는 입자를 개별적으로 유지하고 있습니다.

나는 쉽게 타이밍을 만들기 위해 함수에 코드를 돌았 다 :

def IIII(array_a, array_b, bins) : 
    hist_data=np.zeros([len(bins)-1]) 
    for i in product(array_a,array_b): 
     (result,bins) = np.histogram(newdist(i), bins=bins) 
     hist_data+=result 
    hist_data = np.array(map(float, hist_data)) 
    return hist_data 

당신은, 그런데, 덜 복잡한 방식으로 샘플 데이터를 생성 할 수 있습니다 다음과 같이

n = 100 
array_a = np.random.uniform(0, 10, size=(n, 2)) 
array_b = np.random.uniform(0, 10, size=(n, 2)) 

그래서 먼저 func을 벡터화해야합니다. 나는 그것을 마치 array 모양의 (4, ...) 걸릴 수 있도록했습니다. 메모리를 절약하기 위해 계산을 수행하고 첫 번째 평면 (예 : array[0])을 반환합니다.

In [2]: h1 = IIII(array_a, array_b, x_edges) 

In [3]: h2 = IIII_bis(array_a, array_b, x_edges) 

In [4]: np.testing.assert_almost_equal(h1, h2) 

그러나 타이밍 차이 : 둘 다 같은를 반환 n = 100 점으로

def IIII_vec(array_a, array_b, bins) : 
    array_ab = np.empty((4, len(array_a), len(array_b))) 
    a = np.swapaxes(array_a, 0 ,1) 
    b = np.swapaxes(array_b, 0 ,1) 
    array_ab[[0, 1]] = a[:, :, None] 
    array_ab[[2, 3]] = b[:, None, :] 
    newdist = func_vectorized(array_ab) 
    hist, _ = np.histogram(newdist, bins=bins) 
    return hist 

: 장소에서이 기능으로

def func_vectorized(a) : 
    a[1] **= 22 
    np.sin(a[2], out=a[2]) 
    a[0] *= a[1] 
    a[0] += a[2] 
    a[0] += a[3] 
    return a[0] 

, 우리는 IIII의 벡터화 버전을 쓸 수 있습니다 이미 매우 관련이 있습니다.

In [5]: %timeit IIII(array_a, array_b, x_edges) 
1 loops, best of 3: 654 ms per loop 

In [6]: %timeit IIII_vec(array_a, array_b, x_edges) 
100 loops, best of 3: 2.08 ms per loop 

300x 속도 향상 !. 당신은 더 이상 샘플 데이터, n = 1000로 다시 시도하면 300X가 남아 있도록, 당신은 n**2, 그들은 그 모두 규모가 똑같이 나쁜 볼 수 있습니다

In [10]: %timeit IIII(array_a, array_b, x_edges) 
1 loops, best of 3: 68.2 s per loop 

In [11]: %timeit IIII_bis(array_a, array_b, x_edges) 
1 loops, best of 3: 229 ms per loop 

그래서 당신은 여전히 ​​좋은 10 분을 찾고 있습니다. 이는 현재 솔루션에 필요한 2 일 이상과 비교할 때 그리 많지는 않습니다.

물론 좋겠다면 (4, 50000, 50000) 수레 배열을 메모리에 넣을 필요가 있습니다. 내 시스템에서는 처리 할 수 ​​없습니다. 하지만 여전히 처리량을 비교적 빠르게 유지할 수 있습니다. 다음 버전의 IIII_vec은 각 배열을 d 청크로 나눕니다. 서면에 따르면 배열의 길이는 d으로 나눌 수 있어야합니다. 그것은 그 한계를 극복하기 위해 너무 열심히 꿀벌 않을 것이다, 그러나 진정한 목적 당황 것 :

In [4]: h1 = IIII_vec(array_a, array_b, x_edges) 

In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10) 

In [6]: np.testing.assert_almost_equal(h1, h2) 

그리고 지금 약간의 타이밍을 :

def IIII_vec_bis(array_a, array_b, bins, d=1) : 
    a = np.swapaxes(array_a, 0 ,1) 
    b = np.swapaxes(array_b, 0 ,1) 
    a_chunk = len(array_a) // d 
    b_chunk = len(array_b) // d 
    array_ab = np.empty((4, a_chunk, b_chunk)) 
    hist_data = np.zeros((len(bins) - 1,)) 
    for j in xrange(d) : 
     for k in xrange(d) : 
      array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None] 
      array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)] 
      newdist = func_vectorized(array_ab) 
      hist, _ = np.histogram(newdist, bins=bins) 
      hist_data += hist 
    return hist_data 

우선, 정말 작동하는지 확인할 수 있습니다. n = 100으로 :

In [7]: %timeit IIII_vec(array_a, array_b, x_edges) 
100 loops, best of 3: 2.02 ms per loop 

In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10) 
100 loops, best of 3: 12 ms per loop 

하지만 청크로 그 일을 메모리에 더 큰 큰 배열을하는 것을 시작으로 돈을 지불하기 시작합니다. n = 1000으로 :

In [12]: %timeit IIII_vec(array_a, array_b, x_edges) 
1 loops, best of 3: 223 ms per loop 

In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10) 
1 loops, best of 3: 208 ms per loop 

n = 10000와 나는 더 이상 배열없이 IIII_vec를 호출 할 수는 오류가 너무 큰하지만, 땅딸막 한 버전은 아직 실행 :

In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10) 
1 loops, best of 3: 21.8 s per loop 

을 그리고 그냥 할 수있는 것을 보여 끝내자. 나는 한 번 실행했다. n = 50000 :

In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50) 
1 loops, best of 1: 543 s per loop 

좋은 9 분 숫자 경색의 문제는 25 억 개의 상호 작용을 계산 한 것만 큼 나쁘지 않습니다.

관련 문제