나는 RabbitMQ에서 머리를 되 찾으려고합니다. 하나의 큐를 수신 대기시키고 메시지를 수신하면 복수 메시지가있는 reply_to
헤더를 통해 지정된 익명 큐에 응답해야합니다.RabbitMQ 및 EventMachine :: defer
desc "start_consumer", "start the test consumer"
def start_consumer
puts "Running #{AMQP::VERSION} version of the gem"
AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/",
:logging => true, :port => 5672) do |connection|
channel = AMQP::Channel.new(connection)
requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)
Signal.trap("INT") do
connection.close do
EM.stop{exit}
end
end
channel.prefetch(1)
requests_queue.subscribe(:ack => true) do |header, body|
puts "received in server #{body.inspect}"
(0..5).each do |n|
header.ack
reply = {:reply => "Respone #{n}", :is_last => (n == 5)}
AMQP::Exchange.default.publish(
MultiJson.encode(reply),
:routing_key => header.reply_to,
:correlation_id => header.correlation_id
)
sleep(2)
end
end
puts " [x] Awaiting RPC requests"
end
end
내 호출 코드는 다음과 같습니다 :
def publish(urlSearch, routing_key)
EM.run do
corr_id = rand(10_000_000).to_s
requests ||= Hash.new
connection = AMQP.connect(:host => "localhost")
callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)
callback_queue.subscribe do |header, body|
reply = MultiJson.decode(body)
if reply[:is_last.to_s]
connection.close do
EM.stop{exit}
end
end
end
callback_queue.append_callback(:declare) do
AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
end
end
문제 메시지가 아니라는 것이다
는 지금까지 소비자와 reply_to
메시지에 대한 가입자 모두가 다음과 같은 작업을 반복의 모든 메시지가 게시 될 때까지 전송됩니다.
루프가 완료된 후 (0..5)
루프가 완료되면 모든 메시지가 게시됩니다.
누군가가 EventMachine::defer
을 사용하는 것이 옵션 일 수 있다고 말했지만이 방법을 루프에 적용하는 방법을 모르겠습니다.
누구든지이 문제를 해결하는 방법에 대한 포인터를 제안 할 수 있습니까? EventMachine::defer
은 좋은 옵션입니까? 그렇다면 AMQP.start
을 사용할 때 어떻게해야합니까?
내가 EM.defer https://gist.github.com/1480745를 사용하여 종료 :
당신은, 당신을 위해 뭔가를 작업을 예약 할 EM :: 반복자 같은 것을 사용할 수 있습니다. 내가 JRuby를 사용하고 있는데 아마도 자바 스레드 사용에 대해 생각해야합니다. – dagda1