2013-07-19 8 views
6

프로세스를 실행하는 멀티 프로세싱 얻을 수 없다, 나는 확실하지 않다 _run() 기능. 모든 프로세스가 생성 된 것으로 보입니다. 그러나 단일 프로세스로 처리하는 것보다 더 빠르지는 않습니다. 기본적으로 run_**_normalizers() 함수에서 발생하는 일은 데이터베이스 (SQLAlchemy)의 큐 테이블에서 읽은 다음 몇 가지 HTTP 요청을 한 다음 노멀 라이저의 '파이프 라인'을 실행하여 데이터를 수정 한 다음 다시 데이터베이스에 저장하는 것입니다. 나는 쓰레드가 '무겁고'병렬 처리에 자주 사용되는 JVM 토지에서 갈 것입니다. 다중 처리 모듈이 파이썬의 GIL 한계를 극복해야한다고 생각했기 때문에 약간 혼란 스럽습니다.는 아래의 코드가 동시에 실행하지 않는 것 동시에

+0

다중 처리 모듈은 스레드가 아닌 프로세스를 사용합니다. 따라서 GIL의 영향을받지 않습니다. –

+0

코드를 테스트했으며 필수 기술은 정상입니다. 나는 'config' 사전이 많이 사용된다면 공유 된'config'에 대해 확신 할 수 없다. 프로세서가 병목 현상이 아닐 수도 있습니다. –

+0

필자는 워크 스테이션에서만 8GB 16GB RAM의 Linux를 실행했습니다. 1 또는 1, 8 또는 16 프로세스를 사용하면 아무런 변화가 없으며 시스템 리소스도 정상입니다. –

답변

3

내 다중 처리 문제가 수정되어 실제로 스레드가 전환되었습니다. 실제로 생각했던 것을 고쳐 썼는지 확신 할 수 없습니다. 모든 것을 다시 설계하고 작업자와 작업자를 만들었지 만, 지금은 비행하지 않는 것들이 있습니다. 여기에 내가 무슨 짓을했는지의 기초이다 :

import abc 
from Queue import Empty, Queue 
from threading import Thread 

class AbstractTask(object): 
    """ 
     The base task 
    """ 
    __metaclass__ = abc.ABCMeta 

    @abc.abstractmethod 
    def run_task(self): 
     pass 

class TaskRunner(object): 

    def __init__(self, queue_size, num_threads=1, stop_on_exception=False): 
     super(TaskRunner, self).__init__() 
     self.queue    = Queue(queue_size) 
     self.execute_tasks  = True 
     self.stop_on_exception = stop_on_exception 

     # create a worker 
     def _worker(): 
      while self.execute_tasks: 

       # get a task 
       task = None 
       try: 
        task = self.queue.get(False, 1) 
       except Empty: 
        continue 

       # execute the task 
       failed = True 
       try: 
        task.run_task() 
        failed = False 
       finally: 
        if failed and self.stop_on_exception: 
         print('Stopping due to exception') 
         self.execute_tasks = False 
        self.queue.task_done() 

     # start threads 
     for i in range(0, int(num_threads)): 
      t = Thread(target=_worker) 
      t.daemon = True 
      t.start() 


    def add_task(self, task, block=True, timeout=None): 
     """ 
      Adds a task 
     """ 
     if not self.execute_tasks: 
      raise Exception('TaskRunner is not accepting tasks') 
     self.queue.put(task, block, timeout) 


    def wait_for_tasks(self): 
     """ 
      Waits for tasks to complete 
     """ 
     if not self.execute_tasks: 
      raise Exception('TaskRunner is not accepting tasks') 
     self.queue.join() 

내가 모두) (wait_for_tasks를 호출 한 후 TaskRunner를 만들고 (그 중 수천)에 작업을 추가하고있다. 그래서 분명히 다시 아키텍처에서 제가했던 다른 문제가 수정되었습니다. 이상한.

1

여전히 멀티 프로세싱 솔루션을 찾고 있다면, 먼저 근로자의 풀을 사용하는 방법을 확인 할 수 있습니다, 당신은 자신에 NUM_THREADS 프로세스를 관리 할 필요가 없습니다 것입니다 : http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

그리고 둔화 문제에 대해 config 객체를 _run 함수의 인수로 전달하려고 시도한 적이 있습니까? 나는 이것이 내부적으로 변화를 일으킬 지 여부를 알지 못하지만 그것이 무엇인가를 바꿀 수 있다고 생각합니다.

관련 문제