2013-07-29 3 views
0

나는 N 개의 소비자로 구성된 프로듀서를 가지고있다.파이썬 프로세스 간 큐 오버플로

생산자는 대량의 TCP 메시지 (1 분당 10,000 개)를 수신하는 소켓을 청취하고,이 데이터를 읽고이를 작업자를위한 대기열에 넣습니다.

iterations = 0 
work_iterations = 0 
while True: 
    try: 
    iterations += 1 
    data = queue.get(block=False) 
    work_iterations +=1 
    do_work(data) 
    except Queue.Empty: 
    time.sleep(0.001) #avoid high CPU usage 


    if iterations == 100: 
    load = float(work_iterations/iterations) 
    print load 
    iterations = 0 
    work_iterations = 0 

이 코드를 단순화,하지만 당신은 내가 작업자 부하를 볼 수 있지만 얼마나 많은 반복을 볼려고 볼 수 있습니다

노동자는 I는 다음과 같이 큐에서 읽을-설정 한 100의 직원은 실제로 대기열에서 작업을 끌어낼 수있었습니다. 부하가 지속적으로 100/100이라면 생산자/고객 대기열이 백 로그됨을 알게됩니다. 이론적으로이 이어야합니다.

출력에서 ​​볼 수있는 것은 0.97, 0.99 및 매우 적은 1.0입니다. 그러나 큐는 몇 분 내에 채우지 만 (크기 제한은 10,000입니다), 프로듀서 쪽에서 데이터를 삭제해야합니다. 왜 이런 일이 일어나고 있는지 조명하는 사람이 있습니까? 작업자 프로세스가 평균 97/100 회 반복 작업을하는 경우 대기열이 빈 no에 가까워 야 함을 의미합니까?

답변

-1

block = Flase 및 time.sleep()을 제거하면 어떻게됩니까? 근로자 수를 계산할 수 없습니다.

0

queue.get (block = False)를 호출하면 대기열이 실제로 비어 있지 않아도 Queue.Empty가 발생할 수 있습니다. 현재 프로세스가 큐에 액세스하기 위해 잠금을 얻을 수없는 경우 실제로 큐에있는 항목 수에 관계없이 Queue.Empty가 발생합니다.

멀티/queues.py에서 Queue.get() 코드에서 빠른보기 : 큐 실제로 예외를 발생하기 전에 얼마나 전체에 대한 검사가없는

126 if not self._rlock.acquire(block, timeout): 
127  raise Empty 

공지있다. 대기열에 넣을 정보가 너무 많기 때문에 Queue.Empty가 몇 번 제기되었다고 생각합니다. 실제로 큐에 넣는 동안 잠금을 보유한 제작자가 원인으로 인해 작업자가 대기열에 액세스하려는 시도가 실패하게됩니다.

당신은 당신의 코드에 작은 변화로 이것을 확인할 수 있습니다

except Queue.Empty: 
    print queue.qsize() # returns the approximate number of elements in the queue 

the documentation가 말했듯이,이 숫자는 완벽하게 신뢰할 수 없습니다. 그러나 대기열에있는 많은 수의 항목을 다루기 때문에 대기열이 0 또는 10,000에 가까워 졌는지 알려주려면 충분히 근접해야합니다.