2014-08-28 2 views
0

저는 다중 처리 모듈을 처음 사용하기 때문에 클래스가 통신 할 수있는 작업자 클래스를 생성 할 수 있는지 궁금합니다. 나는 객체가 데이터를받는 인스턴스를 가지고 있으며 객체 자체 인 데이터를 기반으로 속성을 빠르게 생성해야한다. 다음 코드는 단순한 경우입니다.Python의 다중 처리 모듈을 사용하여 클래스 속성 생성

class Worker(multiprocessing.Process): 

    def __init__(self, in_queue, out_queue): 
     multiprocessing.Process.__init__(self) 
     self.in_queue = in_queue 
     self.out_queue = out_queue 

    def run(self): 

     #Loop through the in_queue until a None is found 
     for tup in iter(self.in_queue.get,None): 

      #Try to manipulate the data 
      try: 
       self.out_queue.put(tup[0]*tup[1]) 
      except: 
       self.out_queue.put('Error') 
      self.in_queue.task_done() 

     #Remove the None from the in_queue 
     self.in_queue.task_done() 

class Master(): 

    def __init__(self,data): 

     #Initialize some data to be operated on 
     self.data=data 

    def gen_attributes(self): 

     #Initialize Queues to interact with the workers 
     in_queue=multiprocessing.JoinableQueue() 
     out_queue=multiprocessing.Queue() 

     #Create workers to operate on the data 
     for i in range(4): 
      Worker(in_queue,out_queue).start() 

     #Add data to the input queue 
     for tup in self.data: 
      in_queue.put(tup)   

     #Stop Condition for each worker 
     for _ in range(4): 
      in_queue.put(None) 
     in_queue.close() 

     #Wait for processes to finish 
     in_queue.join() 

     #Store the output as new attributes of the class 
     self.attributes=[] 
     for _ in range(0,out_queue.qsize()): 
      self.attributes.append(out_queue.get()) 

#Create and instance of the Master class, and have it generate it's own attributes using the multiprocessing module 
data=[(1,2),(2,2),(3,4)] 
M=Master(data) 
M.gen_attributes() 
print M.attributes 

본질적으로 마스터 클래스의 인스턴스는 주어진 데이터로 생성됩니다. 그런 다음 마스터 클래스는 해당 데이터를 여러 작업자에게 전달하여 출력 대기열에서 작업하고 출력 대기열에 배치합니다. 그런 다음 Master 클래스는 해당 출력을 사용하여 자체 특성을 지정합니다.

답변

0

multiprocessing.Pool의 완벽한 사용 예입니다. 프로그램이 중단되고 Master에 예상 한 특성이 나타나지 않을 수 있습니다. 그 이유는 프로그램이 out_queue에서 읽는 시점에 비어 있기 때문에 대기열에서 정보를 읽으려면 out_queue에 블록이 필요하기 때문에 그 시점에서 아무런 정보도받지 못했기 때문입니다.

간단한 수정 블록 등 같은 큐의 get에있어서, 제한 시간을 기다릴 것이다 :이 작동

while True: 
    attr = out_queue.get(True, 0.1) 
    if not attr: 
    break 
    self.attributes.append(attr) 

하지만 깨끗한 용액이 아니다. 이것을 수정하고 원하는 결과를 얻기 위해 실험 해 볼 수 있습니다. 전리품 값을 사용하는 것이 좋습니다.

+0

Sean, 답장을 보내 주셔서 감사합니다. 그것은 매우 흥미 롭습니다. in_queue.join()에 매달려 있다고 생각했습니다. 즉 작업자가 작업을 수행하지 못하고있었습니다. 그러나 in_queue.join()을 전달하는 경우 모든 작업이 완료되고 out_queue가 결과로 가득해야한다는 의미가 아닙니까? 나는 또한 모든 근로자들이 합류했는지 확인하고 위와 동일한 문제를 겪는 지 실험했다. – Adam

+0

Sean, 솔루션을 구현했는데 제대로 작동하지 않는 것 같습니다. 구현이 있습니까? – Adam

+0

나는 그것을한다. 그러나 그것은 나의 개인의 휴대용 개인 컴퓨터 위에있다. 내가 일하러 나갈 때 나는 그것을 게시하려고 노력할 것이다. – sean

관련 문제