Java에서는 동시 패키지의 일부로 ArrayBlockingQueue라는 클래스가 있습니다. 이것은 스레드 안전성에 대해 걱정하지 않고 대기열에서 항목을 추가 및 제거 할 수있는 스레드 안전 클래스입니다. 이 클래스에는 항목을 대기열에 넣을 수있는 put
메소드가 있습니다. take
메서드는 대기열에서 항목을 제거합니다. put
과 take
에 대한 두 가지 중요한 점은 스레드 인터리빙과의 동기화를 위해 synchronized 키워드가 필요하지 않으며, 아무 것도 들어 있지 않으면 예외를 던지기보다는 무언가가 대기열에 추가 될 때까지 참을성있게 대기한다는 것입니다.루비에서 블로킹 큐 구현
나는 루비에서 비슷한을 구현하기 위해 노력하지만,이 문제는 다음과 같이 queue.pop
이 항목은 (적어도 대기열 중 하나) 큐에 추가하는 경우에도 차단하는 것입니다 :
require 'redis'
require 'date'
def log_debug(str)
debug_str = "#{DateTime.now} #{str}"
puts debug_str
end
class EmailsmsResponder
def initialize
@queue = Queue.new
end
# add to queue
def produce(channel, msg)
@queue << {channel: channel, msg: msg}
puts "queue size: #{@queue.size}"
end
# take from queue
def consume
loop do
log_debug "Whats going on??"
sleep(1)
if [email protected]?
item = @queue.pop
log_debug "removing channel #{item[:channel]} and msg #{item[:msg]} from email-sms thread from queue"
end
end
end
end
class SidekiqResponder
def initialize
@queue = Queue.new
end
def produce(channel, msg)
@queue << {channel: channel, msg: msg}
puts "queue size: #{@queue.size}"
end
def consume
loop do
log_debug "Whats going on??"
sleep(1)
if [email protected]?
value = @queue.pop
log_debug "removing channel #{item[:channel]} and msg #{item[:msg]} from sidekiq thread from queue"
end
end
end
end
class RedisResponder
def initialize(host,port)
@host = host
@port = port
@email_sms = EmailsmsResponder.new
@sidekiq = SidekiqResponder.new
# timeout so we wait for messages forever
@redis = Redis.new(:host => @host, :port => @port, :timeout => 0)
end
def start_producers
thread = Thread.new do
@redis.subscribe('juggernaut') do |on|
# message block fired for new messages
on.message do |channel, msg|
log_debug "New message"
@email_sms.produce(channel, msg)
@sidekiq.produce(channel, msg)
end
end
end
end
def start_consumers
thread = Thread.new do
@email_sms.consume
@sidekiq.consume
end
end
end
responder = RedisResponder.new('127.0.0.1', 6379)
responder.start_producers.join(responder.start_consumers.join)
하나의 대기열이 제대로 작동하고있는 것처럼 보이지만 다른 대기열에서는 아무것도 검색하지 않습니다.
$ ruby redis-client4.rb
2014-07-22T14:53:24-04:00 Whats going on??
2014-07-22T14:53:25-04:00 Whats going on??
2014-07-22T14:53:25-04:00 New message
queue size: 1
queue size: 1
2014-07-22T14:53:26-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue
2014-07-22T14:53:26-04:00 Whats going on??
2014-07-22T14:53:27-04:00 Whats going on??
2014-07-22T14:53:28-04:00 Whats going on??
2014-07-22T14:53:28-04:00 New message
queue size: 1
queue size: 2
2014-07-22T14:53:29-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue
2014-07-22T14:53:29-04:00 Whats going on??
2014-07-22T14:53:30-04:00 Whats going on??
2014-07-22T14:53:31-04:00 Whats going on??
2014-07-22T14:53:31-04:00 New message
queue size: 1
queue size: 3
2014-07-22T14:53:32-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue
2014-07-22T14:53:32-04:00 Whats going on??
2014-07-22T14:53:33-04:00 Whats going on??
2014-07-22T14:53:34-04:00 Whats going on??
2014-07-22T14:53:34-04:00 New message
queue size: 1
queue size: 4
2014-07-22T14:53:35-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue
2014-07-22T14:53:35-04:00 Whats going on??
2014-07-22T14:53:36-04:00 Whats going on??
2014-07-22T14:53:37-04:00 Whats going on??
2014-07-22T14:53:37-04:00 New message
queue size: 1
queue size: 5
내가 뭘 잘못하고 있을까요?
오타없이 질문이 업데이트되었습니다. – JohnMerlino