2011-11-25 6 views
0

나는 RabbitMQ에서 머리를 되 찾으려고합니다. 하나의 큐를 수신 대기시키고 메시지를 수신하면 복수 메시지가있는 reply_to 헤더를 통해 지정된 익명 큐에 응답해야합니다.RabbitMQ 및 EventMachine :: defer

desc "start_consumer", "start the test consumer" 
def start_consumer 
    puts "Running #{AMQP::VERSION} version of the gem" 

     AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/", 
         :logging => true, :port => 5672) do |connection| 

     channel = AMQP::Channel.new(connection) 

     requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true) 

     Signal.trap("INT") do 
      connection.close do 
      EM.stop{exit} 
      end 
     end 

     channel.prefetch(1) 

     requests_queue.subscribe(:ack => true) do |header, body| 
      puts "received in server #{body.inspect}" 

      (0..5).each do |n| 
      header.ack 

      reply = {:reply => "Respone #{n}", :is_last => (n == 5)} 

      AMQP::Exchange.default.publish(
              MultiJson.encode(reply), 
              :routing_key => header.reply_to, 
              :correlation_id => header.correlation_id 
             ) 


      sleep(2) 
      end 
     end 

     puts " [x] Awaiting RPC requests" 
     end   
    end 

내 호출 코드는 다음과 같습니다 :

def publish(urlSearch, routing_key) 
    EM.run do 

     corr_id = rand(10_000_000).to_s 

     requests ||= Hash.new 

     connection = AMQP.connect(:host => "localhost") 

     callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)    

     callback_queue.subscribe do |header, body| 

     reply = MultiJson.decode(body)    

     if reply[:is_last.to_s] 
      connection.close do 
      EM.stop{exit} 
      end 
     end 
     end       

     callback_queue.append_callback(:declare) do 
     AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id) 
     end 

    end 

문제 메시지가 아니라는 것이다

는 지금까지 소비자와 reply_to 메시지에 대한 가입자 모두가 다음과 같은 작업을 반복의 모든 메시지가 게시 될 때까지 전송됩니다.

루프가 완료된 후 (0..5) 루프가 완료되면 모든 메시지가 게시됩니다.

누군가가 EventMachine::defer을 사용하는 것이 옵션 일 수 있다고 말했지만이 방법을 루프에 적용하는 방법을 모르겠습니다.

누구든지이 문제를 해결하는 방법에 대한 포인터를 제안 할 수 있습니까? EventMachine::defer은 좋은 옵션입니까? 그렇다면 AMQP.start을 사용할 때 어떻게해야합니까?

답변

0

EM과 작업 할 때 기억해야 할 것은 원자로 자체가 단일 스레드라는 것입니다. 루프가 끝나기 전에 루프가 끝나기를 기다리는 이유는 루프가 단일 스레드를 차지하기 때문입니다. (당신은 정말로 정말로 EM 응용 프로그램에서 수면을 사용하고 싶지 않습니다. 전체 반응기를 차단하고 아무 일도 일어나지 않습니다.)

메시지를 게시하거나 다른 모든 EM 행동 (타이머 발사, 다른 데이터 수신 등)은 원자로 루프를시켜야합니다.

EM::Iterator.new(0..5).each do |n, iter| 
    reply = {:reply => "Response #{n}", :is_last => (n == 5)} 
    AMQP::Exchange.default.publish(MultiJson.encode(reply), 
           :routing_key => header.reply_to, 
           :correlation_id => header.correlation_id) 
    iter.next 
end 
header.ack 
+0

내가 EM.defer https://gist.github.com/1480745를 사용하여 종료 :

당신은, 당신을 위해 뭔가를 작업을 예약 할 EM :: 반복자 같은 것을 사용할 수 있습니다. 내가 JRuby를 사용하고 있는데 아마도 자바 스레드 사용에 대해 생각해야합니다. – dagda1