2016-10-27 4 views
4

저는 파이썬의 multiprocessing 모듈과 공유 메모리를 가지고 놀고 있습니다. Process과 공유 메모리 객체를 사용할 수 있지만 Pool과 공유 메모리 객체를 사용할 수 없습니다. 내 Pool 콜백을 추가하고 콜백이 호출되지 않는 것 같습니다.Pool을 사용하여 공유 메모리 객체를 변경할 수 없습니다.

from multiprocessing import Array, Pool, Process 

def flip(x,a): 
    a[x] = 0 if a[x] else 1 
    return (x, a[x]) 

def cb(result): 
    print(result) 

if __name__ == '__main__': 

    # size of array 
    N = 10 

    # shared array - N bytes - unsynchronized - initialized to zeros 
    a = Array('B', N, lock=False) 

    # flip values to ones using Process 
    processes = [Process(target=flip, args=(x, a)) for x in range(N)] 
    for p in processes: p.start() 
    for p in processes: p.join() 
    print([a[i] for i in range(N)])  

    # flip values back to zeros using Pool 
    pool = Pool(processes=4) 
    for x in range(N): 
     pool.apply_async(flip, args=(x, a), callback=cb) 
    pool.close() 
    pool.join() 
    print([a[i] for i in range(N)]) 

나는 내 공유 배열 callback 인쇄하고 다시 배열 모두 0 단일 선 다음, 모두 1 회 인쇄 얻을 것으로 기대하지만, 대신를 얻을 것;

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 

Pool은 작업을 실행하지 않는 이유는 무엇입니까?

최소한의 예제를 위해 공유 메모리를 사용합니다.

def f(x): 
    return x 

def cb(result): 
    print('cb',result) 

if __name__ == '__main__': 

    pool = Pool(processes=4) 
    pool.apply_async(f, range(10), callback=cb) 
    pool.close() 
    pool.join() 

나는 0에서 9까지의 숫자를 별도의 줄에 인쇄 할 것을 기대하지만 아무 것도 출력하지 않습니다.

바로 위의 apply_sync 호출을이 코드로 바꿉니다.

pool.apply_async(f, args=[10], callback=cb) 

나는 range(10)[1,2,3]는, [(1),(2),(3)]가, 또는 ([1],[2],[3]) 더 출력을 얻을 수없는 [10]로 교체 출력

cb 10 

를 얻을.

+1

(x) (range) : pool.apply .... 플립을 4 번 적용 하시겠습니까? 1 - 0 - 1 - 0 - 1 (1로 끝남) – chapelo

+0

@chapelo - 4는 풀에있는 작업자 수입니다. 'for '는해야 할 10 가지 작업을 만들어야합니다. 'cb'는 절대로 호출되지 않기 때문에 실행중인 작업이없는 것 같습니다. – CAB

+0

풀 (pool)이 자동으로 작업자들 사이에서 작업을 나누고, args를 잘못 전달하고 있습니다. – Aaron

답변

0

multiprocessing의 사용을 고려하면 대개 데이터가 매우 큽니다. N 크기의 배열에 대해 N 개의 프로세스를 사용한 것처럼 각 데이터에 하나의 프로세스를 할당하는 것은 의미가 없습니다.

은이 두 가지 접근 방법을 고려

1) 각 프로세스는 배열의 덩어리를 처리합니다. flip_many()partition()

2) 각 데이터는 풀 작업자에 매핑됩니다. flip_one()

나머지 코드는 원래 코드와 매우 비슷합니다.

from multiprocessing import Array, Pool, Process 

def flip_many(start_idx, end_idx): 
    for idx in range(start_idx, end_idx + 1): 
     a[idx] = not(a[idx]) 

def flip_one(idx): 
    a[idx] = not(a[idx]) 
    return idx, a[idx] 

def cb(result): 
    print(result) 

def partition(range_, n): 
    start, end = range_ 
    size = (end - start) // n 
    ranges = [] 
    for _ in range(n): 
     ranges.append((start, start+size-1)) 
     start += size 
    if ranges[-1][1] != end-1: 
     ranges[-1] = (ranges[-1][0], end-1) 
    return ranges  

if __name__ == '__main__': 

    # size of array 
    N = 10 
    N_procs = 2 
    ranges = partition((0, N), N_procs) 

    # shared array - N bytes - unsynchronized - initialized to zeros 
    a = Array('B', N, lock=False) 
    print([a[i] for i in range(N)], "elements of array initialized to 0")  

    # flip values to ones using Process 

    processes = [] 
    for i in range(N_procs): 
     p = Process(target=flip_many, args=(*ranges[i],)) 
     processes.append(p) 
     p.start() 

    for p in processes: 
     p.join() 

    print([a[i] for i in range(N)], "First flip by N processes, should be 1")  

    # flip values back to zeros using Pool 
    pool = Pool() 
    indices = range(N) 
    pool.map(flip_one, indices) 
    print([a[i] for i in range(N)], "Second flip by the pool.map ... 0") 

    pool.map(flip_one, indices, chunksize=N // N_procs) 
    print([a[i] for i in range(N)], "Third flip by the pool.map ... 1") 

    pool.map_async(flip_one, indices, callback=cb) 
    print([a[i] for i in range(N)], "Fourth flip by the pool.map_async ... 0") 
    print(" Due to the async nature, flip not reflected until .join()") 
    print(" But the callback returns the correct results:") 

    pool.close() 
    pool.join() 
    print([a[i] for i in range(N)], "Content after the join... 0") 
관련 문제