Ruby 용 ThreadPool 구현을 찾을 수 없으므로 여기에서 코드를 기반으로 부분적으로는 http://snippets.dzone.com/posts/show/3276을 작성했지만 ThreadPool 종료에 대한 신호 및 기타 구현이 변경되었습니다. (100 개 스레드를 가진 약 1,300 작업을 처리)를 실행, 그것은 라인 (25)에 교착 상태로 죽는다 -. 그것은이 새 작업을 기다리는 모든 아이디어는 왜 일어날 수ThreadPool에서 교착 상태가 발생했습니다
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {[email protected]?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
1. @workers에 대한 액세스가 동기화되어서는 안됩니까? 2. 작업자 스레드에서 여전히 잠금 및 잠금 해제가 필요한 이유는 무엇입니까? – Roman
작업자에 대한 액세스는 항상 동일한 스레드에서 수행되므로 동기화가 필요하지 않습니다. 작업자 스레드의 잠금은 스레드를 안전하게 깨우기 위해 필요합니다. – PierreBdR
여전히 문제가 있습니다. 교착 상태가 발생할 수 있습니다. 작업자 스레드가 자신을 큐에 추가하면 ThreadPool은 큐에서 큐를 가져 와서 작업을 할당 할 수 있습니다. 이 경우 신호가 전송됩니다. 그러나 작업자 스레드가 cv에서 대기 중이 아니면 신호가 손실됩니다. – Roman