2009-11-27 3 views
3

py-amqplib을 사용하여 Python의 RabbitMQ에 액세스하고 있습니다. 응용 프로그램은 때때로 특정 MQ 주제를 청취하라는 요청을 수신합니다.py-amqplib을 사용하여 여러 대기열에서 메시지를 대기하는 방법

그것에는 AMQP 연결하고 채널을 생성하고 메시지를 수신하도록 새로운 스레드 시작 이러한 요청을 수신하고, 상기 제 1 시간 :

connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False) 
    channel = connection.channel() 

    listener = AMQPListener(channel) 
    listener.start() 

AMQPListener 매우 간단

class AMQPListener(threading.Thread): 
    def __init__(self, channel): 
     threading.Thread.__init__(self) 
     self.__channel = channel 

    def run(self): 
     while True: 
      self.__channel.wait() 

연결을 만든 후 다음과 같이 관심 주제에 구독합니다.

channel.queue_declare(queue = queueName, exclusive = False) 
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True) 
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination) 

def receive_callback(msg): 
    self.queue.put(msg.body) 

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback) 

처음으로이 모든 것이 정상적으로 작동합니다. 그러나 다른 주제를 구독하는 이후 요청시 실패합니다. 후속 요청에서 AMQP 연결과 AMQPListener 스레드를 다시 사용합니다 (각 항목에 대해 새 스레드를 시작하지 않으므로). 채널 위의 코드 블록을 호출하면 channel.queue_declare() 메서드 호출이 반환되지 않습니다. 그 시점에서 새 채널을 만들려고했는데 connection.channel() 호출도 반환하지 않습니다.

내가 작동하도록 할 수 있었던 유일한 방법은 주제별로 새로운 연결, 채널 및 수신기 스레드 (예 : routing_key)를 만드는 것이지만 실제로는 이상적이지 않습니다. 나는 그것이 어떻게 든 전체 연결을 차단하는 wait() 메소드라고 의심하지만, 그것에 대해 어떻게 해야할지 잘 모르겠습니다. 분명히 단일 리스너 스레드를 사용하여 여러 라우팅 키 (또는 여러 채널)를 통해 메시지를 수신 할 수 있어야합니까?

관련 질문입니다. 해당 주제가 더 이상 필요하지 않을 때 수신기 스레드를 중단 하시겠습니까? 메시지가 없으면 channel.wait() 호출이 영구적으로 차단 된 것처럼 보입니다. 내가 생각할 수있는 유일한 방법은 더미 메시지를 큐에 보내 "독"하게하는 것입니다. 리스너에 의해 멈추는 신호로 해석됩니다.

답변

1

은 채널 당 하나 이상의 comsumer 그냥 basic_consume를 사용하여 다른 하나()를 부착 한 후 channel.wait()를 사용합니다. basic_consume()을 통해 연결된 모든 대기열을 수신합니다. basic_consume() 각각에 대해 서로 다른 소비자 태그를 정의해야합니다.

대기열의 특정 사용자를 취소 (특정 주제를 취소)하려는 경우 channel.basic_cancel (consumer_tag)을 사용하십시오.

+0

감사합니다. 그러나 문제는 언제든지 새로운 주제를 구독해야 할 수도 있다는 것입니다. 'channel.wait()'이 호출 된 후. – EMP

관련 문제