2013-02-03 2 views
22

무거운 계산을 병렬 처리하기 위해 multiprocessing.Pool()을 사용하고 있습니다.큰 데이터로 다중 처리

대상 함수는 많은 양의 데이터 (거대한 목록)를 반환합니다. 나는 RAM이 부족하다.

multiprocessing이 없으면 대상 함수를 생성자로 변경하고 yield 결과 요소를 하나씩 차례로 계산합니다.

멀티 프로세싱이 생성자를 지원하지 않는다는 것을 이해합니다. 출력물 전체를 기다렸다가 즉시 반환합니다. 맞습니까? 항복하지 마라. Pool 근로자가 RAM에 전체 결과 배열을 만들지 않고도 데이터를 사용할 수있게되는 즉시 데이터를 산출 할 수있는 방법이 있습니까?

간단한 예 :

def target_fnc(arg): 
    result = [] 
    for i in xrange(1000000): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     for element in result: 
      yield element 

이 파이썬 2.7입니다.

답변

15

이 대기열에 이상적인 사용 사례처럼 들린다 :

http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes 단순히 풀링 된 근로자 대기열에 결과를 먹이고 마스터에서 그들을 섭취.

작업자가 채우는 속도보다 거의 빨리 대기열을 배출하지 않으면 메모리 압력 문제가 발생할 수 있습니다. 대기열 크기 (대기열에 들어갈 수있는 최대 객체 수)를 제한 할 수 있습니다.이 경우 풀 작업자는 대기열에서 공간을 사용할 수있을 때까지 queue.put 문에서 차단합니다. 이렇게하면 메모리 사용량이 늘어납니다. 그러나이 작업을 수행하는 경우 풀링이 필요한지 그리고/또는 더 적은 수의 근로자를 사용하는 것이 합리적인지 다시 생각해 볼 때가 있습니다.

+1

대기열에서 피클링 된 데이터가 전달됩니다. 따라서 데이터 -> pickle-> unpickle-> 새로운 데이터 사본. 이렇게하면 프로그램이 느려지고 RAM을 더 많이 사용하게됩니다. 대신 공유 메모리를 사용하는 것을 고려해야합니다. – Wang

3

작업이 청크로 데이터를 반환 할 수있는 경우 작은 작업으로 나눌 수 있으며 각각 하나의 청크가 반환됩니까? 분명히 이것은 항상 가능하지는 않습니다. 그렇지 않은 경우에는 Loren Abrams가 제안한대로 Queue과 같은 다른 메커니즘을 사용해야합니다. 그러나 일 때,이 문제를 해결할뿐만 아니라 다른 이유로 더 나은 해결책 일 수 있습니다.

예를 들어, 이것은 분명히 할 수 있습니다. 예를 들어 :

def target_fnc(arg, low, high): 
    result = [] 
    for i in xrange(low, high): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    pool_args = [] 
    for low in in range(0, 1000000, 10000): 
     pool_args.extend(args + [low, low+10000] for args in some_args) 
    for result in pool.imap_unordered(target_fnc, pool_args): 
     for element in result: 
      yield element 

은 (당신이 원하는 경우는 물론, 중첩 된 이해와 루프, 또는 zipflatten을 대체 할 수있다.) some_args[1, 2, 3] 경우

그래서, 당신은 300를 얻을 수 있습니다 작업 - [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …], 각 요소는 1000000 대신 10000 개의 요소 만 반환합니다.

3

사용자의 설명을 보면 백만 가지 요소가 전달되는 것을 피하는 것처럼 데이터를 처리하는 데별로 관심이없는 것 같습니다. list 뒷면.

더 간단한 방법이 있습니다. 데이터를 파일에 넣기 만하면됩니다. 예를 들면 :

분명히
def target_fnc(arg): 
    fd, path = tempfile.mkstemp(text=True) 
    with os.fdopen(fd) as f: 
     for i in xrange(1000000): 
      f.write('dvsdbdfbngd\n') 
    return path 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     with open(result) as f: 
      for element in f: 
       yield element 

결과가 개행 문자를 포함하거나 등, 문자열이 아닌 수 있다면, 당신은 대신 간단한 텍스트 파일 등 csv 파일, numpy를 사용하는 것이 좋습니다하지만, 아이디어는 동일합니다.

이것이 더 간단하더라도 일반적으로 한 번에 데이터를 처리하면 이익이 발생하므로 작업을 해체하거나 Queue을 사용하는 것이 좋습니다 (다른 두 답변과 마찬가지로). 단점 (각각, 작업을 망치거나 생산 된만큼 빨리 데이터를 소비 할 수 있어야 함)은 거래 차단자가 아닙니다.