2014-05-23 4 views
1

저는 장고에서 스레드를 사용하여 "작업 관리자"와 같은 것을 만들려고합니다. 어떤 작업을 기다리고있을 것입니다.장고 다중 처리 및 빈 큐 넣기

import multiprocessing 
from Queue import Queue 


def task_maker(queue_obj): 
    while True: 
     try: 
      print queue_obj.qsize() # << always print 0 
      _data = queue_obj.get(timeout=10) 
      if _data: 
       _data['function'](*_data['args'], **_data['kwargs']) 
     except Empty: 
      pass 
     except Exception as e: 
      print e 


tasks = Queue() 
stream = multiprocessing.Process(target=task_maker, args=(tasks,)) 
stream.start() 


def add_task(func=lambda: None, args=(), kwargs={}): 
    try: 
     tasks.put({ 
      'function': func, 
      'args': args, 
      'kwargs': kwargs 
     }) 
     print tasks.qsize() # print a normal size 1,2,3,4... 

    except Exception as e: 
     print e 

사용자가 요청할 때 views.py 파일에서 "add_task"를 사용하고 있습니다. "스트림"의 대기열이 항상 비어있는 이유는 무엇입니까? 내가 뭘 잘못하고있어?

+0

['celery'] (http://docs.celeryproject.org/ko/latest/index.html)를 사용해 본 적이 있습니까? 사용하기 쉽고, 성숙하며, 장고와 잘 어울립니다. 휠을 재발견해야하는 압도적 인 이유가 없다면 정말이 도구가 가장 좋습니다. – dgel

답변

0

현재 코드에는 두 가지 문제가 있습니다. 1) 다중 처리 (스레드는 아님)를 사용하면 qsize() 기능이 신뢰할 수 없습니다. 혼동을 야기하므로 사용하지 않는 것이 좋습니다. 2) 대기열에서 가져온 객체를 직접 수정할 수 없습니다.

데이터를 앞뒤로 보내는 두 개의 프로세스를 고려하십시오. 다른 사람이 데이터를 수정했는지 여부는 데이터가 비공개이므로 알 수 없습니다. 통신하려면 Queue.put() 또는 Pipe을 사용하여 명시 적으로 데이터를 보내십시오.

생산자/소비자 시스템의 일반적인 작동 방식은 다음과 같습니다. 1) 작업이 대기열에 들어 있습니다. 2) 작업자 블록이 작업을 기다리고 있습니다. 작업이 나타나면 다른 대기열에 결과를 둡니다. 3) 관리자 또는 'beancounter'프로세스가 두 번째 큐의 출력을 소비하고 인쇄하거나 처리합니다.

재미있게 보내세요!

#!/usr/bin/env python 

import logging, multiprocessing, sys 


def myproc(arg): 
    return arg*2 

def worker(inqueue, outqueue): 
    logger = multiprocessing.get_logger() 
    logger.info('start') 
    while True: 
     job = inqueue.get() 
     logger.info('got %s', job) 
     outqueue.put(myproc(job)) 

def beancounter(inqueue): 
    while True: 
     print 'done:', inqueue.get() 

def main(): 
    logger = multiprocessing.log_to_stderr(
      level=logging.INFO, 
    ) 
    logger.info('setup') 

    data_queue = multiprocessing.Queue() 
    out_queue = multiprocessing.Queue() 

    for num in range(5): 
     data_queue.put(num) 

    worker_p = multiprocessing.Process(
     target=worker, args=(data_queue, out_queue), 
     name='worker', 
    ) 
    worker_p.start() 

    bean_p = multiprocessing.Process(
     target=beancounter, args=(out_queue,), 
     name='beancounter', 
     ) 
    bean_p.start() 

    worker_p.join() 
    bean_p.join() 
    logger.info('done') 


if __name__=='__main__': 
    main() 
1

나는 그것을 얻었습니다. 이유는 모르겠지만 "스레딩"을 시도했을 때 효과적이었습니다!

from Queue import Queue, Empty 
import threading 

MailLogger = logging.getLogger('mail') 


class TaskMaker(threading.Thread): 

    def __init__(self, que): 
     threading.Thread.__init__(self) 
     self.queue = que 

    def run(self): 
     while True: 
      try: 
       print "start", self.queue.qsize() 
       _data = self.queue.get() 
       if _data: 
        print "make" 
        _data['function'](*_data['args'], **_data['kwargs']) 
      except Empty: 
       pass 
      except Exception as e: 
       print e 
       MailLogger.error(e) 

tasks = Queue() 
stream = TaskMaker(tasks) 
stream.start() 


def add_task(func=lambda: None, args=(), kwargs={}): 
    global tasks 
    try: 
     tasks.put_nowait({ 
      'function': func, 
      'args': args, 
      'kwargs': kwargs 
     }) 

    except Exception as e: 
     print e 
     MailLogger.error(e) 
+0

(스레드는 포함하지 않음) 다중 프로세스에서 qsize() 함수는 신뢰할 수 없습니다. 스레드를 사용하면 같은 프로세스에서 대기열을 사용할 수 있으므로 신뢰할 수 있습니다. 여러 프로세스에서 하나의 proc이 크기를 확인하고 다른 프로세스가 동시에 요소를 추가하면 혼란 스러울 수 있습니다. 따라서'qsize'는 다중 처리에서 신뢰할 수 없습니다. – johntellsall