2013-03-08 2 views
1

나는 성취해야 할 흥미로운 상황이 있습니다. AMQP 대기열에있는 메시지를 기다리고 대기하는 EventMachine 루프가 있어야하지만 정기적 인 간격으로 별도의 AMQP 대기열에 메시지를 보내려면 해당 루프를 인터럽트합니다. 저는 EventMachine을 처음 접했고 이것은 EventMachine 루프가 필요한 메시지를 보내지 않는다는 것을 제외하고는 지금까지 가지고있는 것입니다.AMQP 멀티 태스킹

지금 내가이 만든이 발동 : 다음

listen_loop = Proc.new { 
     AMQP.start(connection_config) do |connection| 
      AMQP::Channel.new(connection) do |channel| 
       channel.queue("queue1", :exclusive => false, :durable => true) do |requests_queue| 
        requests_queue.once_declared do 
         consumer = AMQP::Consumer.new(channel, requests_queue).consume 
         consumer.on_delivery do |metadata, payload| 
          puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." 
          response = "responding" 
          channel.default_exchange.publish(response, 
           :routing_key => metadata.reply_to, 
           :correlation_id => metadata.message_id, 
           :mandatory  => true) 
          metadata.ack 
         end 
        end 
       end 
      end 
     end 
     Signal.trap("INT") { AMQP.stop { EM.stop } } 
     Signal.trap("TERM") { AMQP.stop { EM.stop } } 
    } 

    send_message = Proc.new { 
     AMQP.start(connection_config) do |connection| 
      channel = AMQP::Channel.new(connection) 
      queue = channel.queue('queue2') 

      channel.default_exchange.publish("hello world", :routing_key => queue.name) 
      EM.add_timer(0.5) do 
       connection.close do 
        EM.stop{ exit } 
       end 
      end 
     end 
    } 

그리고 내 EventMachine 루프가 있습니다

EM.run do 
     EM.add_periodic_timer(5) { send_message.call } 
     listen_loop.call 
    end 

나는 듣고 루프에서 메시지를 수신 할 수 있어요하지만 난 드릴 수 없습니다 일정한 간격으로 메시지를 보내십시오.

답변

0

내가 잘못하고있는 것을 파악했습니다. 메시지 루프가 이미 연결되었으므로 RabbitMQ 서버에 대한 새 연결을 열 수 없습니다. 모든 것을 단일 EventMachine 루프에 통합하고 연결을 재사용하면 작동합니다. 호기심 사람들을 위해

그것은 다음과 같습니다

EM.run do 

    AMQP.start(connection_config) do |connection| 
     channel = AMQP::Channel.new(connection) 

     EM.add_periodic_timer(5) { channel.default_exchange.publish("foo", :routing_key => 'queue2') } 

     queue = channel.queue("queue1", :exclusive => false, :durable => true) 
     channel.prefetch(1) 
     queue.subscribe(:ack => true) do |metadata, payload| 
      puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." 
      response = "bar" 
      channel.default_exchange.publish(response, 
       :routing_key => metadata.reply_to, 
       :correlation_id => metadata.message_id, 
       :mandatory  => true) 
      metadata.ack 
     end 
    end 
    Signal.trap("INT") { AMQP.stop { EM.stop } } 
    Signal.trap("TERM") { AMQP.stop { EM.stop } } 

end