큰 응용 프로그램의 일부로 여러 작업자에 대한 기본 요청 속도 제한을 설정해야합니다. 이 아이디어는 오히려 간단합니다. "즉시"플래그로 "토큰"메시지를 게시하면 아무도 기다리지 않으면이 메시지가 자동으로 삭제됩니다. 작업자가 나가는 요청을 보내기 바로 전에 토큰 대기열에만 가입하도록하면 토큰을 "저장"하지 않고 각 토큰을 한 번만 사용할 수 있습니다. 나는 이것을 다소 우아한 것으로 생각했다.이 AMQP 단일 메시지 가입자를 안정하게하려면 어떻게합니까?
불행히도 가입자를 추가하고 제거하는 것이 완전히 안정적이지는 않습니다. 전체 예제를 https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733에 설치했습니다. 코드는 다음과 같습니다 :
amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)
amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)
:
require 'bundler'
Bundler.setup
require 'amqp'
puts "single-message consumer listening to rapid producer"
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
def start_producer
exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "")
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
def start_consumer
EM::PeriodicTimer.new(CONSUME_RATE) do
started = Time.now
AMQP::Channel.new do |channel_consumer|
channel_consumer.prefetch(1)
tick_queue = channel_consumer.queue(QUEUE_NAME)
consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
consumer.cancel
channel_consumer.close
end
consumer.consume
end
end
end
EM.run do
EM.set_quantum(50)
start_producer
start_consumer
end
몇 분에 대한 예는 두 개의 오류 중 하나가 죽어 끝나는 것을 실행 첫 번째 오류는 구독자가 삭제 되었기 때문입니다. bu 메시지는 여전히 전달되고
amq-client
라이브러리는 이런 일이 일어나지 않을 것이라고 절대 예상하지 않습니다. 두 번째 오류는 갑자기 닫힌 연결이있는 게시자에서 발생합니다.
예상치 못한 문제가 발생하지 않도록하려면 어떻게해야합니까? 사용
버전 :
- OS X 10.7.1
- 루비 1.9.2p312 (2011-08-11 수요일 수정 32,926) x86_64에-darwin11.1.0]
- RabbitMQ 2.6.1
Gemfile :
source 'http://rubygems.org'
gem 'amqp'
Gemfile.lock 다음 #rabbitmq 채널에서
GEM
remote: http://rubygems.org/
specs:
amq-client (0.8.3)
amq-protocol (>= 0.8.0)
eventmachine
amq-protocol (0.8.1)
amqp (0.8.0)
amq-client (~> 0.8.3)
amq-protocol (~> 0.8.0)
eventmachine
eventmachine (0.12.10)
PLATFORMS
ruby
DEPENDENCIES
amqp
eventmachine
답변에서 "하나의 채널 만 사용하십시오."라고 말했지만 여전히 2 개를 만들었습니까? 나는 그것을 얻지 못한다 ... 나는 단지 하나의 채널을 만들었지 만이 오류가 발생한다. –
채널은주기적인 타이머 루프 외부에서 선언되고 각 틱마다 재사용됩니다. 실제로 제작자와 소비자를위한 채널이 있지만 AMQP 행복에 필수적입니다. –