2013-08-05 5 views
1

저는 직원 풀이있는 프로젝트를 진행하고 있습니다. 내장 된 multiprocessing.Pool을 사용하고 있지 않지만 자체 프로세스 풀을 만들었습니다.하위 프로세스를 정상적으로 종료 처리

작동 방식은 multiprocessing.Queue의 두 인스턴스를 작성한 것입니다. 하나는 작업자에게 작업 태스크를 보내고 다른 하나는 결과를 다시 수신하는 것입니다.

while True: 
    try: 
     request = self.request_queue.get(True, 5) 
    except Queue.Empty: 
     continue 
    else: 
     result = request.callable(*request.args, **request.kwargs) 
     self.results_queue.put((request, result)) 

일부 오류 처리 코드가있다,하지만 난 brewity 위해 그것을 밖으로 떠난 :

각 작업자는이 같은 영구적으로 실행 루프에 앉아있다. 각 작업자 프로세스는 daemon1으로 설정되어 있습니다.

주 프로세스와 모든 하위 작업자 프로세스를 올바르게 종료하고 싶습니다. 지금까지 (Ctrl + C를하고) 내 경험 : 특별한 구현과

  • 각 자식 프로세스는 KeyboardInterrupt의 추적과/충돌을 중지하지만, 주요 프로세스가 존재하고 살해 할()를 가지고 있지 않습니다.
  • 자식 프로세스에 대한 신호 처리기를 구현하고 SIGINT를 무시하도록 설정하면 주 스레드에서 KeyboardInterrupt tracebok를 표시하지만 아무 일도 발생하지 않습니다.
  • 하위 프로세스와 주 프로세스에 대한 신호 처리기를 구현하면 신호 처리기가 주 프로세스에서 호출되지만 sys.exit()을 호출해도 영향을 미치지 않는 것으로 나타납니다.

나는 이것을 처리하는 "최상의 방법"을 찾고있다. 또한 QueuePipe과 상호 작용하는 프로세스를 종료하면 다른 프로세스와 교착 상태를 일으킬 수 있습니다 (세마포 및 기타 내부적으로 사용되는 것으로 인해).

내 접근 방식은 다음과 같습니다. - 주 루프를 종료 할 각 프로세스 (별도의 명령 대기열 또는 유사 사용)에 내부 신호를 보내는 방법을 찾습니다. - shutdown 명령을 보내는 주 루프에 대한 신호 처리기를 구현합니다. 자식 프로세스는 신호를 무시하도록 설정하는 자식 처리기를 갖습니다.

이 방법이 맞습니까?

답변

0

주의해야 할 점은 종료하려는 큐에 메시지가 있다는 가능성을 처리하여 프로세스가 입력 대기열을 깨끗하게 배출 할 수있는 방법이 필요하다는 것입니다. 주 프로세스가 시스템 종료 시간임을 인식하는 프로세스라고 가정하면이 작업을 수행 할 수 있습니다.

  1. 각 작업자 프로세스에 감시를 보냅니다. 이것은 보통 메시지처럼 보이지 않는 특수 메시지 (자주 None)입니다. 센티넬 후에 각 작업자 프로세스의 큐를 비우고 닫습니다. 당신의 작업자 프로세스에서
  2. 는 다음 의사 코드와 유사한 코드를 사용 :

는 여러 프로세스에 메시지를 보낼 수있는 가능성이있는 경우

while True: # Your main processing loop 
    msg = inqueue.dequeue() # A blocking wait 
    if msg is None: 
     break 
    do_something() 
outqueue.flush() 
outqueue.close() 
inqueue 좀 더 정교한 접근이 필요합니다.이 샘플은 logging.handlers.QueueListenermonitor 메서드에 대한 소스 코드에서 가져온 것으로 Python 3.2 이상에서 하나의 가능성을 보여줍니다.

  """ 
      Monitor the queue for records, and ask the handler 
      to deal with them. 

      This method runs on a separate, internal thread. 
      The thread will terminate if it sees a sentinel object in the queue. 
      """ 
      q = self.queue 
      has_task_done = hasattr(q, 'task_done') 
      # self._stop is a multiprocessing.Event object that has been set by the 
      # main process as part of the shutdown processing, before sending 
      # the sentinel   
      while not self._stop.isSet(): 
       try: 
        record = self.dequeue(True) 
        if record is self._sentinel: 
         break 
        self.handle(record) 
        if has_task_done: 
         q.task_done() 
       except queue.Empty: 
        pass 
      # There might still be records in the queue. 
      while True: 
       try: 
        record = self.dequeue(False) 
        if record is self._sentinel: 
         break 
        self.handle(record) 
        if has_task_done: 
         q.task_done() 
       except queue.Empty: 
        break 
+0

나는 그것을 많이 쉽게 각 프로세스에 신호를 "중단"전송하도록 만든 threading.Event의 프로세스 기반의 복제, 있다는 것을 발견하고, 나는 당신이 쓴 것과 유사한 무언가를 설정 . –

관련 문제