내가 5 개 항목에 대기열이 있다고 가정 할 때 RabbitMQ 작업을 requeueing :이 큐의 선두에서 메시지를는 거부하고 prefetch_count == 1
(tail) E, D, C, B, A (head)
을하지만, 그 메시지 A
를 결정하는 처리에 적합하지 않습니다 현재로서는. 나는 requeue=True
에 해당 항목을 reject
, 큐가된다 :
(tail) A, E, D, C, B (head)
나는 다음 B
, C
, D
및 E
, ack
각 하나를 보내고 소모한다. 이제 큐는 A
만을 보유하고 있으며, 나는 계속해서 소비하고, 결코 끝나지 않는 루프에서는 reject
을 계속 반복합니다. 새로운 A
메시지가 들어 오면 거의 즉시 소비 된 다음 프로세스는 A
을 소비하려고하는 루프를 다시 시작합니다.
나는이는 새앙 토끼의 문서에서 Twisted Consumer Example에 약간의 수정으로 수행
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task
@defer.inlineCallbacks
def run(connection):
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')
queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')
#yield channel.basic_qos(prefetch_count=1)
queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)
l = task.LoopingCall(read, queue_object)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object):
ch,method,properties,body = yield queue_object.get()
print body
if body == 'A':
yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
yield ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()
문제점 : 참고 다음 주석 처리 된 라인 :
#yield channel.basic_qos(prefetch_count=1)
내가 그 주석을 해제 소비자가 메시지 A
에 도달하면 reject
이후에 즉시 다시 선택하고 그 뒤에있는 대기열에서 대기중인 다른 항목은 무시합니다. 거부 된 항목을 대기열의 꼬리에 배치하는 대신 계속해서 다시 시도하여 계속 대기열의 모든 항목을 완전히 차단합니다.
줄을 주석으로 처리하면 제대로 작동합니다 (조금 더 느리지 만). 회선이 있고 prefetch_count > 1
인 경우에도 작동합니다. 정확히 1
으로 설정하면 문제가 발생합니다.
메시지 거부에 누락 된 단계가 있습니까? A
? 아니면 Pika의 선불 시스템이 근본적인이 사건과 호환되지 않습니까?
동일한 메시지를 반복적으로 반복함으로써 CPU 시간이 소모되지 않는다고 가정 해 봅니다. 프리 페치 대기열에서 메시지를 거부하여 모든 메시지가 루프가 돌아 오기 전에 통과 할 수 있도록 할 수 있습니까? – smitelli
유효하지 않은 메시지를 완전히 삭제하고 [Dead Lettering] (http://www.rabbitmq.com/dlx.html)을 사용하여 다른 큐에 넣으면 나중에 수동으로 또는 다른 방법으로 처리 할 수 있습니다. 메시지 흐름이 얼마나 활발한 지 – pinepain