2011-10-05 4 views
3

큰 응용 프로그램의 일부로 여러 작업자에 대한 기본 요청 속도 제한을 설정해야합니다. 이 아이디어는 오히려 간단합니다. "즉시"플래그로 "토큰"메시지를 게시하면 아무도 기다리지 않으면이 메시지가 자동으로 삭제됩니다. 작업자가 나가는 요청을 보내기 바로 전에 토큰 대기열에만 가입하도록하면 토큰을 "저장"하지 않고 각 토큰을 한 번만 사용할 수 있습니다. 나는 이것을 다소 우아한 것으로 생각했다.이 AMQP 단일 메시지 가입자를 안정하게하려면 어떻게합니까?

불행히도 가입자를 추가하고 제거하는 것이 완전히 안정적이지는 않습니다. 전체 예제를 https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733에 설치했습니다. 코드는 다음과 같습니다 :

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)

:

require 'bundler' 
Bundler.setup 

require 'amqp' 

puts "single-message consumer listening to rapid producer" 

QUEUE_NAME = 'test.rapid-queue-unsubscription' 
PRODUCE_RATE = 1.0/10 
CONSUME_RATE = 1.0/9 

def start_producer 
    exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

    n = 0 
    EM::PeriodicTimer.new(PRODUCE_RATE) do 
    message = "msg #{n}" 
    exchange.publish(message, 
        :immediate => true, # IMPORTANT, messages are dropped if nobody listening now 
        :routing_key => QUEUE_NAME) 
    puts "> PUT #{message}" 
    n += 1 
    end 
end 

def start_consumer 

    EM::PeriodicTimer.new(CONSUME_RATE) do 

    started = Time.now 
    AMQP::Channel.new do |channel_consumer| 
     channel_consumer.prefetch(1) 
     tick_queue = channel_consumer.queue(QUEUE_NAME) 

     consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true) 
     consumer.on_delivery do |_, message| 

     took = Time.now - started 
     puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]" 

     consumer.cancel 
     channel_consumer.close 
     end 
     consumer.consume 
    end 
    end 
end 

EM.run do 
    EM.set_quantum(50) 

    start_producer 
    start_consumer 
end 

몇 분에 대한 예는 두 개의 오류 중 하나가 죽어 끝나는 것을 실행 첫 번째 오류는 구독자가 삭제 되었기 때문입니다. bu 메시지는 여전히 전달되고 amq-client 라이브러리는 이런 일이 일어나지 않을 것이라고 절대 예상하지 않습니다. 두 번째 오류는 갑자기 닫힌 연결이있는 게시자에서 발생합니다.

예상치 못한 문제가 발생하지 않도록하려면 어떻게해야합니까? 사용

버전 :

  • OS X 10.7.1
  • 루비 1.9.2p312 (2011-08-11 수요일 수정 32,926) x86_64에-darwin11.1.0]
  • RabbitMQ 2.6.1

Gemfile :

source 'http://rubygems.org' 

gem 'amqp' 

Gemfile.lock 다음 #rabbitmq 채널에서

GEM 
    remote: http://rubygems.org/ 
    specs: 
    amq-client (0.8.3) 
     amq-protocol (>= 0.8.0) 
     eventmachine 
    amq-protocol (0.8.1) 
    amqp (0.8.0) 
     amq-client (~> 0.8.3) 
     amq-protocol (~> 0.8.0) 
     eventmachine 
    eventmachine (0.12.10) 

PLATFORMS 
    ruby 

DEPENDENCIES 
    amqp 
    eventmachine 

답변

2

(AMQP 저자 antares_) : 단 하나의 채널을 사용, 그리고 그것을 잘 작동합니다. 약간 변경되었지만 안정 버전 :

require 'bundler' 
Bundler.setup 

require 'amqp' 

puts "single-message consumer listening to rapid producer" 

QUEUE_NAME = 'test.rapid-queue-unsubscription' 
PRODUCE_RATE = 1.0/10 
CONSUME_RATE = 1.0/9 

def start_producer channel 
    exchange = AMQP::Exchange.new(channel, :direct, "") 

    n = 0 
    EM::PeriodicTimer.new(PRODUCE_RATE) do 
    message = "msg #{n}" 
    exchange.publish(message, 
        :immediate => true, # IMPORTANT, messages are dropped if nobody listening now 
        :routing_key => QUEUE_NAME) 
    puts "> PUT #{message}" 
    n += 1 
    end 
end 

def start_consumer channel 
    EM::PeriodicTimer.new(CONSUME_RATE) do 

    started = Time.now 
    tick_queue = channel.queue(QUEUE_NAME) 

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true) 
    consumer.on_delivery do |_, message| 

     took = Time.now - started 
     puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]" 

     consumer.cancel do 
     puts "< GET #{message} (CANCEL DONE)" 
     end 
    end 
    consumer.consume 
    end 
end 

EM.run do 
    EM.set_quantum(50) 

    AMQP::Channel.new do |channel| 
    start_producer channel 
    end 

    AMQP::Channel.new do |channel| 
    channel.prefetch(1) 
    start_consumer channel 
    end 

end 
+0

답변에서 "하나의 채널 만 사용하십시오."라고 말했지만 여전히 2 개를 만들었습니까? 나는 그것을 얻지 못한다 ... 나는 단지 하나의 채널을 만들었지 만이 오류가 발생한다. –

+0

채널은주기적인 타이머 루프 외부에서 선언되고 각 틱마다 재사용됩니다. 실제로 제작자와 소비자를위한 채널이 있지만 AMQP 행복에 필수적입니다. –

관련 문제