2013-07-25 5 views
0

8 개의 CPU 코어와 200 개의 작업이 있습니다. 각 작업은 분리되어 있습니다. 결과를 기다리거나 공유 할 필요가 없습니다. 8 가지 작업/프로세스를 한 번에 (최대) 실행할 수있는 방법을 찾고 있는데 그 중 하나가 완료되었습니다. 나머지 작업은 자동 시작 프로세스입니다.파이썬 다중 처리 예약 작업

자식 프로세스가 완료된시기를 알고 새 자식 프로세스를 시작하는 방법. 우선 프로세스 (다중 처리)를 사용하려고하는데 알아 내기가 어렵습니다. 그런 다음 동적 인스턴스화를 사용해야하는 이유로 피클 문제로 풀과 얼굴을 사용하려고합니다.

편집 : 수 없습니다 피클 : 내장 .instancemethod이

실패 속성 조회하지만 풀

class Collectorparallel(): 

def fire(self,obj): 
    collectorController = Collectorcontroller() 
    collectorController.crawlTask(obj) 

def start(self): 
    log_to_stderr(logging.DEBUG) 
    pluginObjectList = [] 
    for pluginName in self.settingModel.getAllCollectorName(): 
     name = pluginName.capitalize() 
     #Get plugin class and instanitiate object 
     module = __import__('plugins.'+pluginName,fromlist=[name]) 
     pluginClass = getattr(module,name) 
     pluginObject = pluginClass() 
     pluginObjectList.append(pluginObject) 



    pool = Pool(8) 
    jobs = pool.map(self.fire,pluginObjectList) 
    pool.close() 

    print pluginObjectList 

pluginObjectList 가지고

[<plugins.name1.Name1 instance at 0x1f54290>, <plugins.name2.Name2 instance at 0x1f54f38>] 

PicklingError 같은 내 코드를 추가 처리 버전은 정상적으로 작동합니다.

+0

해결책은'multiprocessing.Pool'을 사용하는 것입니다. 창 살 클리핑 문제를 해결하는 방법은 코드에 따라 다릅니다. ** 일부 코드 게시 ** 테스트 용으로 사용할 수있는 * small * 및 * self-contained * 예제를 작성할 수 있고이를 사용 사례를 명확하게 보여줄 수 있다면. – Bakuriu

+0

@Bakuriu 내 코드를 추가했습니다. – Runicer

답변

0

경고 이것은 배포 및 상황에 다소 주관적이지만 현재 설정은 다음과 같습니다.

나는 근로자 프로그램을 보유하고 있으며, 6 복사본 (최대 6 코어)을 사용합니다. 각 작업자는 다음을 수행합니다.

  1. 연결합니다 레디 스 인스턴스
  2. 시도하고
  3. 중 하나가 유휴 또는 '큐'
에서 작업의 부족에 종료 로그인 정보를 이륙 특정 목록의 몇 가지 작업을 팝업

각 프로그램은 기본적으로 독립 실행 형이지만 별도의 대기열 시스템에서 필요한 작업을 수행합니다. 프로세스에 아무런 영향을 미치지 않으므로 문제의 해결책이 될 수 있습니다.

0

문제의 해결책은 간단합니다. 우선, 방법은 절대로 피할 수 없습니다. pickle's documentation에 나와있는 사실 만 종류의 절인 할 수 있습니다

  • None , True , and False
  • integers, long integers, floating point numbers, complex numbers
  • normal and Unicode strings
  • tuple s, list s, set s, and dict ionaries containing only picklable objects
  • functions defined at the top level of a module
  • built-in functions defined at the top level of a module
  • classes that are defined at the top level of a module
  • instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section The pickle protocol for details).

[...]

Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised. [4]

Similarly, classes are pickled by named reference, so the same restrictions in the unpickling environment apply. Note that none of the class’s code or data is pickled[...]

은 분명히 방법은 따라서이 절인 할 수없는, 모듈의 최상위 수준에 정의 된 함수가 아닙니다 (문서의 부분을주의 깊게 읽어 보시기 바랍니다. ! 피클 미래의 문제를 피할 수)하지만 전역 함수와 방법을 교체하고 추가 매개 변수로 self를 전달하기 위해 절대적으로 사소한하기 :

import itertools as it 


def global_fire(argument): 
    self, obj = argument 
    self.fire(obj) 


class Collectorparallel(): 

    def fire(self,obj): 
     collectorController = Collectorcontroller() 
     collectorController.crawlTask(obj) 

    def start(self): 
     log_to_stderr(logging.DEBUG) 
     pluginObjectList = [] 
     for pluginName in self.settingModel.getAllCollectorName(): 
      name = pluginName.capitalize() 
      #Get plugin class and instanitiate object 
      module = __import__('plugins.'+pluginName,fromlist=[name]) 
      pluginClass = getattr(module,name) 
      pluginObject = pluginClass() 
      pluginObjectList.append(pluginObject) 



     pool = Pool(8) 
     jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList)) 
     pool.close() 

     print pluginObjectList 

주, 그 Pool.map는 하나의 인수로 주어진 함수를 호출하기 때문에, 우리는 self과 실제 인수를 모두 "함께 묶어야"합니다. 이렇게하려면 zipit.repeat(self) 및 원래 iterable 있습니다.

전화가 걸리는 순서에 신경 쓰지 않으면 pool.imap_unordered일 수도 있고 일 때 더 나은 성능을 제공 할 수 있습니다. 그러나 iterable 및 목록을 반환하므로 결과 목록을 원할 경우 jobs = list(pool.imap_unordered(...))해야 할 것입니다.

0

이 코드는 모든 산세 문제를 제거합니다. 여기에 무슨 일이 일어 났는지

class Collectorparallel(): 

def __call__(self,cNames): 
    for pluginName in cNames: 
     name = pluginName.capitalize() 
     #Get plugin class and instanitiate object 
     module = __import__('plugins.'+pluginName,fromlist=[name]) 
     pluginClass = getattr(module,name) 
     pluginObject = pluginClass() 
     pluginObjectList.append(pluginObject) 

    collectorController = Collectorcontroller() 
    collectorController.crawlTask(obj) 

def start(self): 
    log_to_stderr(logging.DEBUG) 
    pool = Pool(8) 
    jobs = pool.map(self,self.settingModel.getAllCollectorName()) 
    pool.close() 

Collectorparallel가 호출로 전환 된 것입니다. 플러그인 이름의 목록은 풀의 반복 가능한 것으로 사용되며 플러그인의 실제 결정과 인스턴스화는 각 작업자 프로세스에서 수행되며 클래스 인스턴스 객체는 각 작업자 프로세스의 호출 가능 객체로 사용됩니다.