2016-07-01 2 views
6

새 개체 목록을 만들기 위해 여러 프로세스를 시작합니다. htop은 1 ~ 4 개의 프로세스를 보여줍니다 (항상 3 개의 새 오브젝트를 생성합니다). 훨씬 아래로 둔화 Python3 : 다중 처리가 많은 RAM을 소비하고 속도가 느려짐

def foo(self): 
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, self.information) 
     self.new_objs = result.get() 
     pool.terminate() 
    gc.collect() 

내가 foo() 여러 번, 그것은 전체 프로세스가 느리게 실행 호출 될 때마다 호출 프로그램도, 결국 완료되지 않습니다. 이 프로그램은 모든 RAM을 먹기 시작하지만 순차적 접근 방식은 중요한 RAM 사용량이 없습니다.

내가 프로그램을 죽일 때, 이것은 대부분 프로그램이 마지막으로 실행 한 기능이었습니다.

->File "threading.py", line 293, in wait 
    waiter.acquire() 

편집 내 상황에 대한 몇 가지 정보를 제공합니다. 노드로 구성된 트리를 만듭니다. foo()은 자식 노드를 만들기 위해 부모 노드에서 호출됩니다. 프로세스에 의해 반환 된 result은 이러한 자식 노드입니다. 그것들은 부모 노드의리스트에 저장됩니다. 순차적 인 방식으로 생성하는 대신 그 자식 노드의 생성을 병렬 처리하고 싶습니다.

답변

2

귀하의 문제는 주로 병렬화 된 기능이 메서드 개체와 관련이 있다고 생각합니다. 당신이 인쇄되는 "Dead" 다음에 "Born"을 볼 기대할 수 있도록 한 번만라고 지금 Container

import multiprocessing as mp 
import numpy as np 
import gc 


class Object(object): 
    def __init__(self, _): 
     self.data = np.empty((100, 100, 100), dtype=np.float64) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(self.new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def new_obj(self, i): 
     return Object(i) 

    def __del__(self): 
     print("Dead") 


if __name__ == '__main__': 
    c = Container() 
    for j in range(5): 
     c.foo() 

; 그것은 더 많은 정보없이 확신 할 수 있지만,이 작은 장난감 프로그램을 고려하는 것이 어렵다 프로세스에 의해 실행되는 코드는 컨테이너의이므로 전체 컨테이너은 을 의미합니다.

Born 
Born 
Born 
Born 
Born 
Dead 
Born 
Dead 
Dead 
Born 
Dead 
Born 
... 
<MANY MORE LINES HERE> 
... 
Born 
Dead 

자신을 설득하기 위해 전체 컨테이너가 복사되어 있고 주변의 모든 시간을 보내 컨테이너마다 실행지도의에 재건되는대로이 실행, 당신은 혼합 된 "Born""Dead"의 흐름을 볼 수, 일부 비 serialisable 값을 설정하려고 : 그것은 컨테이너에는 직렬화 할 수 없기 때문에

def foo(self): 
    with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, range(50)) 
     self.fn = lambda x: x**2 
     self.objects.extend(result.get()) 
     pool.terminate() 
    gc.collect() 

즉시 AttributeError을 올릴 것이다있다.

은 이제 정리해 보자 : 풀 1000 개 요청을 보낼 때, Container는 프로세스로 전송, 직렬화가 1000 번를 직렬화 복원됩니다. 물론, 그들은 결국 떨어질 것입니다 (너무 많은 이상한 교차 참조가 없다고 가정 할 때), 객체가 직렬화되고, 업데이트되고, 재구매 될 때 RAM에 많은 압력을 가할 것입니다. 요소를 입력하십시오.

어떻게 해결할 수 있습니까? 음, 이상적, 공유하지 않는 상태 :

def new_obj(_): 
    return Object(_) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

이 짧은 시간에 완료하고, (하나의 Container이 지금까지 구축 될 때)에만 RAM에 가장 작은 소형 연식 비행선을 생산하고 있습니다.일부 내부 상태가 전달되어야하는 경우 압축을 풀어 다음과 같이 보내면됩니다.

def new_obj(tup): 
    very_important_state, parameters = tup 
    return Object(very_important_state=very_important_state, 
        parameters=parameters) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     important_state = len(self.objects) 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, 
            ((important_state, i) for i in range(50))) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

이것은 이전과 같은 동작입니다. 이 절대적으로 인 경우 프로세스간에 몇 가지 변경 가능한 상태를 공유하는 것을 피할 수 없으며 매번 모든 것을 복사하지 않고도 수행 할 수 있으므로 the multiprocessing tools을 확인하십시오.

+0

제 편집을 참조하십시오. 그래서 당신을 이해한다면, 나는 각 프로세스에서 객체 외부에 extern 메서드를 호출해야합니다. – Jonas

+0

객체의 메소드 인 parallelize 함수'self.new_obj'는 각 부모 노드가 직렬화되어 각 호출에서 전송되도록 요구합니다. _function_'new_obj (...)'가 (단순, 고아, 'stateless') 새로운 노드를 반환하고 foo가 그것을 링크하는 것을 담당하도록 그 메소드를 추출 할 수 있다면 (참조 부모 <-> 자식을 추가하는 등. ..하지만 호출 프로세스 _에서),이 모든 문제는 사라질 가능성이 높습니다. 자식 프로세스는 minmal 상태 만 전송하면됩니다. – val

관련 문제