2012-04-27 7 views
0

나는 풀 할당을 위해 파이썬에서이 동작을 알아 냈다. 풀에 20 개의 프로세스가 있지만 8 개의 프로세스에 대해 map_async를 수행 할 때 모든 프로세스를 실행하는 대신 4 개의 실행 만 얻습니다. 그 4 개가 끝나면 2 개를 더 보내고 그 중 2 개가 끝내면 1 개를 보냅니다.다중 처리 - 풀 할당

20 이상을 던지면 위의 동작이 반복 될 때 대기열에서 20 개 미만으로 시작될 때까지 20 개 모두 실행됩니다.

이것은 의도적으로 수행되었다고 가정하지만 이상하게 보입니다. 내 목표는 들어오는 즉시 요청을 처리하도록하는 것이고 분명히이 동작은 적합하지 않습니다.

maxtasksperchild 지원

내가 어떻게 향상시킬 수있는 어떤 아이디어에 대한 billiard와 파이썬 2.6을 사용하십니까?

코드 :

내가 함수 function()를 사용하고자하는 데이터가 : 나는 파이썬에서 다중 처리

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10) 

while True: 
    lines = DbData.GetAll() 
    if len(lines) > 0: 
     print 'Starting to process: ', len(lines), ' urls' 
     Res = mypool.map_async(RunChild, lines) 
     Returns = Res.get(None) 
     print 'Pool returns: ', idx, Returns 
    else: 
     time.sleep(0.5) 

답변

2

한 가지 방법은 다음과 같다. 당신이 추가되는 것을 완벽하게 제어 할 수 있습니다 이런 식으로

results = [] 
while idqueue.qsize() < nbprocess: 
    pass 
while resultqueue.qsize() > 0: 
    results.append(resultqueue.get()) 

:

import multiprocessing 

class ProcessThread(multiprocessing.Process): 
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue): 
     self.id_t = id_t 
     self.inputlist = inputqueue 
     self.idqueue = idqueue 
     self.function = function 
     self.resultqueue = resultqueue 

     multiprocessing.Process.__init__(self) 

    def run(self): 
     s = "process number: " + str(self.id_t) + " starting" 
     print s 
     result = [] 

     while self.inputqueue.qsize() > 0 
      try: 
       inp = self.inputqueue.get() 
      except Exception: 
       pass 
      result = self.function(inp) 
      while 1: 
       try: 
        self.resultqueue.put([self.id,]) 
       except Exception: 
        pass 
       else: 
        break 
      self.idqueue.put(id) 
      return 

및 주요 기능 :

inputqueue = multiprocessing.Queue() 
resultqueue = multiprocessing.Queue() 
idqueue = multiprocessing.Queue() 

def function(data): 
    print data # or what you want 

for datum in data: 
    inputqueue.put(datum) 

for i in xrange(nbprocess): 
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start() 

그리고 마지막으로 얻을 결과
먼저 나는 멀티 프로세싱 서브 클래스를 생성 프로세스 및 기타 물건. 다중 프로세스 inputqueue을 사용하면 다른 프로세스를 큐에 동시 액세스 (예외를 사용하는 이유) 때문에 각 데이터 계산이 매우 느린 경우에만 (< 1,2 초) 효율적인 방법입니다. 함수가 매우 빠르게 계산되는 경우에는 데이터를 한 번만 분할하여 bgining하고 데이터 세트의 청크를 모든 프로세스의 시작 부분에 넣습니다.

+0

감사합니다. 덕분에 스크립트가 수정되었습니다. 나는 기본 풀 처리를 없애고 예제를 기반으로 내 자신을 구현했습니다. – SorinV