py-amqplib을 사용하여 Python의 RabbitMQ에 액세스하고 있습니다. 응용 프로그램은 때때로 특정 MQ 주제를 청취하라는 요청을 수신합니다.py-amqplib을 사용하여 여러 대기열에서 메시지를 대기하는 방법
그것에는 AMQP 연결하고 채널을 생성하고 메시지를 수신하도록 새로운 스레드 시작 이러한 요청을 수신하고, 상기 제 1 시간 :
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener 매우 간단
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
연결을 만든 후 다음과 같이 관심 주제에 구독합니다.
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
처음으로이 모든 것이 정상적으로 작동합니다. 그러나 다른 주제를 구독하는 이후 요청시 실패합니다. 후속 요청에서 AMQP 연결과 AMQPListener 스레드를 다시 사용합니다 (각 항목에 대해 새 스레드를 시작하지 않으므로). 채널 위의 코드 블록을 호출하면 channel.queue_declare() 메서드 호출이 반환되지 않습니다. 그 시점에서 새 채널을 만들려고했는데 connection.channel() 호출도 반환하지 않습니다.
내가 작동하도록 할 수 있었던 유일한 방법은 주제별로 새로운 연결, 채널 및 수신기 스레드 (예 : routing_key)를 만드는 것이지만 실제로는 이상적이지 않습니다. 나는 그것이 어떻게 든 전체 연결을 차단하는 wait() 메소드라고 의심하지만, 그것에 대해 어떻게 해야할지 잘 모르겠습니다. 분명히 단일 리스너 스레드를 사용하여 여러 라우팅 키 (또는 여러 채널)를 통해 메시지를 수신 할 수 있어야합니까?
관련 질문입니다. 해당 주제가 더 이상 필요하지 않을 때 수신기 스레드를 중단 하시겠습니까? 메시지가 없으면 channel.wait() 호출이 영구적으로 차단 된 것처럼 보입니다. 내가 생각할 수있는 유일한 방법은 더미 메시지를 큐에 보내 "독"하게하는 것입니다. 리스너에 의해 멈추는 신호로 해석됩니다.
감사합니다. 그러나 문제는 언제든지 새로운 주제를 구독해야 할 수도 있다는 것입니다. 'channel.wait()'이 호출 된 후. – EMP