2014-06-20 2 views
1

내가 5 개 항목에 대기열이 있다고 가정 할 때 RabbitMQ 작업을 requeueing :이 큐의 선두에서 메시지를는 거부하고 prefetch_count == 1

(tail) E, D, C, B, A (head) 

을하지만, 그 메시지 A를 결정하는 처리에 적합하지 않습니다 현재로서는. 나는 requeue=True에 해당 항목을 reject, 큐가된다 :

(tail) A, E, D, C, B (head) 

나는 다음 B, C, DE, ack 각 하나를 보내고 소모한다. 이제 큐는 A만을 보유하고 있으며, 나는 계속해서 소비하고, 결코 끝나지 않는 루프에서는 reject을 계속 반복합니다. 새로운 A 메시지가 들어 오면 거의 즉시 소비 된 다음 프로세스는 A을 소비하려고하는 루프를 다시 시작합니다.

나는이는 새앙 토끼의 문서에서 Twisted Consumer Example에 약간의 수정으로 수행

import pika 
from pika import exceptions 
from pika.adapters import twisted_connection 
from twisted.internet import defer, reactor, protocol,task 


@defer.inlineCallbacks 
def run(connection): 

    channel = yield connection.channel() 

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic') 

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False) 

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world') 

    #yield channel.basic_qos(prefetch_count=1) 

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False) 

    l = task.LoopingCall(read, queue_object) 

    l.start(0.01) 


@defer.inlineCallbacks 
def read(queue_object): 

    ch,method,properties,body = yield queue_object.get() 

    print body 

    if body == 'A': 
     yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) 
    else: 
     yield ch.basic_ack(delivery_tag=method.delivery_tag) 


parameters = pika.ConnectionParameters() 
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) 
d = cc.connectTCP('hostname', 5672) 
d.addCallback(lambda protocol: protocol.ready) 
d.addCallback(run) 
reactor.run() 

문제점 : 참고 다음 주석 처리 된 라인 :

#yield channel.basic_qos(prefetch_count=1) 

내가 그 주석을 해제 소비자가 메시지 A에 도달하면 reject 이후에 즉시 다시 선택하고 그 뒤에있는 대기열에서 대기중인 다른 항목은 무시합니다. 거부 된 항목을 대기열의 꼬리에 배치하는 대신 계속해서 다시 시도하여 계속 대기열의 모든 항목을 완전히 차단합니다.

줄을 주석으로 처리하면 제대로 작동합니다 (조금 더 느리지 만). 회선이 있고 prefetch_count > 1 인 경우에도 작동합니다. 정확히 1으로 설정하면 문제가 발생합니다.

메시지 거부에 누락 된 단계가 있습니까? A? 아니면 Pika의 선불 시스템이 근본적인이 사건과 호환되지 않습니까?

답변

3

소비자가 한 명이라면 RabbitMQ는 거부 된 소비자에게 메시지를 보내는 것 외에 다른 방법이 없습니다 (basic.reject 또는 basic.nack의 경우와 관계없이).

prefetch_count > 1을 설정하면 소비자가 반복 메시지 옆에 머리글의 반복문을 추가하고 반복문 옆에 문자 메시지를 표시합니다. 실수로 얻는 경우가 rejected 메시지 플래그를 확인하는 것이 캐치 될 수 있도록

N*M는 < = M 모든 메시지 (등등 불타 CPU와에 이르게) 반복되는 것 prefetch_count <= N 소비자의 번호로 메시지를 루프 메시지가 이미 재전송 된 경우 몇 가지 고급 논리가 있습니다.

+0

동일한 메시지를 반복적으로 반복함으로써 CPU 시간이 소모되지 않는다고 가정 해 봅니다. 프리 페치 대기열에서 메시지를 거부하여 모든 메시지가 루프가 돌아 오기 전에 통과 할 수 있도록 할 수 있습니까? – smitelli

+0

유효하지 않은 메시지를 완전히 삭제하고 [Dead Lettering] (http://www.rabbitmq.com/dlx.html)을 사용하여 다른 큐에 넣으면 나중에 수동으로 또는 다른 방법으로 처리 할 수 ​​있습니다. 메시지 흐름이 얼마나 활발한 지 – pinepain

관련 문제