2014-07-22 4 views
1

Java에서는 동시 패키지의 일부로 ArrayBlockingQueue라는 클래스가 있습니다. 이것은 스레드 안전성에 대해 걱정하지 않고 대기열에서 항목을 추가 및 제거 할 수있는 스레드 안전 클래스입니다. 이 클래스에는 항목을 대기열에 넣을 수있는 put 메소드가 있습니다. take 메서드는 대기열에서 항목을 제거합니다. puttake에 대한 두 가지 중요한 점은 스레드 인터리빙과의 동기화를 위해 synchronized 키워드가 필요하지 않으며, 아무 것도 들어 있지 않으면 예외를 던지기보다는 무언가가 대기열에 추가 될 때까지 참을성있게 대기한다는 것입니다.루비에서 블로킹 큐 구현

나는 루비에서 비슷한을 구현하기 위해 노력하지만,이 문제는 다음과 같이 queue.pop이 항목은 (적어도 대기열 중 하나) 큐에 추가하는 경우에도 차단하는 것입니다 :

require 'redis' 
require 'date' 

def log_debug(str) 
    debug_str = "#{DateTime.now} #{str}" 
    puts debug_str 
end 

class EmailsmsResponder 
    def initialize 
    @queue = Queue.new 
    end 

    # add to queue 
    def produce(channel, msg) 
    @queue << {channel: channel, msg: msg} 
    puts "queue size: #{@queue.size}" 
    end 

    # take from queue 
    def consume 
    loop do 
     log_debug "Whats going on??" 
     sleep(1) 
     if [email protected]? 
      item = @queue.pop 
      log_debug "removing channel #{item[:channel]} and msg #{item[:msg]} from email-sms thread from queue" 
     end 
    end 
    end 
end 

class SidekiqResponder 

    def initialize 
    @queue = Queue.new 
    end 

    def produce(channel, msg) 
    @queue << {channel: channel, msg: msg} 
    puts "queue size: #{@queue.size}" 
    end 

    def consume 
    loop do 
     log_debug "Whats going on??" 
     sleep(1) 
     if [email protected]? 
     value = @queue.pop 
     log_debug "removing channel #{item[:channel]} and msg #{item[:msg]} from sidekiq thread from queue" 
     end 
    end 
    end 
end 

class RedisResponder 
    def initialize(host,port) 
    @host = host 
    @port = port 
    @email_sms = EmailsmsResponder.new 
    @sidekiq = SidekiqResponder.new 
    # timeout so we wait for messages forever 
    @redis = Redis.new(:host => @host, :port => @port, :timeout => 0) 
    end 

    def start_producers 
    thread = Thread.new do 
     @redis.subscribe('juggernaut') do |on| 
     # message block fired for new messages 
     on.message do |channel, msg| 
      log_debug "New message" 
      @email_sms.produce(channel, msg) 
      @sidekiq.produce(channel, msg) 
     end 
     end 
    end 
    end 

    def start_consumers 
    thread = Thread.new do 
     @email_sms.consume 
     @sidekiq.consume 
    end 
    end 
end 

responder = RedisResponder.new('127.0.0.1', 6379) 
responder.start_producers.join(responder.start_consumers.join) 

하나의 대기열이 제대로 작동하고있는 것처럼 보이지만 다른 대기열에서는 아무것도 검색하지 않습니다.

$ ruby redis-client4.rb 
2014-07-22T14:53:24-04:00 Whats going on?? 
2014-07-22T14:53:25-04:00 Whats going on?? 
2014-07-22T14:53:25-04:00 New message 
queue size: 1 
queue size: 1 
2014-07-22T14:53:26-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue 
2014-07-22T14:53:26-04:00 Whats going on?? 
2014-07-22T14:53:27-04:00 Whats going on?? 
2014-07-22T14:53:28-04:00 Whats going on?? 
2014-07-22T14:53:28-04:00 New message 
queue size: 1 
queue size: 2 
2014-07-22T14:53:29-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue 
2014-07-22T14:53:29-04:00 Whats going on?? 
2014-07-22T14:53:30-04:00 Whats going on?? 
2014-07-22T14:53:31-04:00 Whats going on?? 
2014-07-22T14:53:31-04:00 New message 
queue size: 1 
queue size: 3 
2014-07-22T14:53:32-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue 
2014-07-22T14:53:32-04:00 Whats going on?? 
2014-07-22T14:53:33-04:00 Whats going on?? 
2014-07-22T14:53:34-04:00 Whats going on?? 
2014-07-22T14:53:34-04:00 New message 
queue size: 1 
queue size: 4 
2014-07-22T14:53:35-04:00 removing channel juggernaut and msg {"channels":["/reports/6561/new"],"data":"New reports for unit 6561"} from email-sms thread from queue 
2014-07-22T14:53:35-04:00 Whats going on?? 
2014-07-22T14:53:36-04:00 Whats going on?? 
2014-07-22T14:53:37-04:00 Whats going on?? 
2014-07-22T14:53:37-04:00 New message 
queue size: 1 
queue size: 5 

내가 뭘 잘못하고 있을까요?

답변

0

나는 루비에서 비슷한을 구현하기 위해 노력하지만, 문제는 queue.pop 항목이 쉽게

는 반증하기 위해 큐에 추가하는 경우에도 차단하는 것입니다 :

require 'thread' 

q = Queue.new 
q << 'hello' 

x = q.pop 
puts x 

x = q.pop 

--output:-- 
hello 

deadlock detected (fatal) 

내가 뭘 잘못하고 있니?

코드 삭제를 시작하고 문제가 발생한 지점을 정확히 파악하기 시작합니다. 정확히 똑같은 두 개의 클래스가 있다는 사실은 당신이 심지어 단순화하기 시작하지 않았다는 것을 의미합니다.

다음이있다 :

def consume 
    loop do 
     log_debug "Whats going on??" 
     sleep(1) 
     value = queue.pop 
     log_debug "removing channel: #{channel} msg: #{msg} of sidekiq thread from queue" 
    end 
    end 

***Error in `consume': undefined local variable or method `queue' 
+0

오타없이 질문이 업데이트되었습니다. – JohnMerlino

0

나는 아래의 코드 작업을 얻었다. 나는 4 개의 스레드를 사용하여 작업을해야한다는 사실을 싫어했다. 그래서 누군가가 더 좋은 솔루션을 가지고 있다면 솔루션을 추천하게되어 기뻤다. 하지만 지금 당장 작동하는 것 같습니다 :

require 'redis' 
require 'date' 

def log_debug(str) 
    debug_str = "#{DateTime.now} #{str}" 
    puts debug_str 
end 

class EmailsmsResponder 
    def initialize 
    @queue = Queue.new 
    end 

    # add to queue 
    def produce(channel, msg) 
    @queue << {channel: channel, msg: msg} 
    puts "queue size: #{@queue.size}" 
    end 

    # take from queue 
    def consume 
    loop do 
     log_debug "Whats going on??" 
     sleep(1) 
     if [email protected]? 
      item = @queue.pop 
      log_debug "removing channel #{item[:channel]} and msg #{item[:msg]} from email-sms thread from queue" 
     end 
    end 
    end 
end 

class SidekiqResponder 

    def initialize 
    @queue = Queue.new 
    end 

    def produce(channel, msg) 
    @queue << {channel: channel, msg: msg} 
    puts "queue size: #{@queue.size}" 
    end 

    def consume 
    loop do 
     log_debug "Whats going on??" 
     sleep(1) 
     if [email protected]? 
     item = @queue.pop 
     log_debug "removing channel #{item[:channel]} and msg #{item[:msg]} from sidekiq thread from queue" 
     end 
    end 
    end 
end 

class RedisResponder 
    def initialize(host,port) 
    @host = host 
    @port = port 
    @email_sms = EmailsmsResponder.new 
    @sidekiq = SidekiqResponder.new 
    # timeout so we wait for messages forever 
    @redis = Redis.new(:host => @host, :port => @port, :timeout => 0) 
    end 

    def start_producers 
    thread = Thread.new do 
     @redis.subscribe('juggernaut') do |on| 
     # message block fired for new messages 
     on.message do |channel, msg| 
      log_debug "New message" 
      @email_sms.produce(channel, msg) 
      @sidekiq.produce(channel, msg) 
     end 
     end 
    end 
    end 

    def start_consumers 
    thread = Thread.new do 
     t1 = Thread.new { @email_sms.consume } 
     t2 = Thread.new { @sidekiq.consume } 
     t1.join(t2.join) 
    end 
    end 
end 

responder = RedisResponder.new('127.0.0.1', 6379) 
responder.start_producers.join(responder.start_consumers.join) 
+0

다른 스레드 내부에서 소비자 스레드를 만드는 이유는 무엇입니까? 왜 작업 대기열과 작업을 실행하고 작업 결과를 결과 대기열에 넣는 작업 대기열을 사용하여 간단한 예제를 통해 작업하지 않았습니까? – 7stud