2013-07-06 2 views
0

루비 애플리케이션에서 나는 아무 상태도 공유하지 않는 작업들을 많이 가지고 있으며 한 번에 여러 개를 시작하려고합니다. 결정적으로, 나는 시작한 순서 나 반환 값에 신경 쓰지 않는다. (완료되기 전에 각각 데이터베이스 트랜잭션이 발생하기 때문에). 내 루비 구현에 따라 GIL이 실제로 동시에 실행되는 작업을 막을 수는 있지만 실제로는 동시성에 관심이 없기 때문에 괜찮습니다. 이러한 작업자 스레드는 네트워크 요청에 대해 IO 바인딩이됩니다. 내가 지금까지있어 무엇루비의 IO 바운드 스레드

this입니다 :

def asyncDispatcher(numConcurrent, stateQueue, &workerBlock) 
    workerThreads = [] 

    while not stateQueue.empty? 
    while workerThreads.length < numConcurrent 
     nextState = stateQueue.pop 

     nextWorker = 
     Thread.new(nextState) do |st| 
      workerBlock.call(st) 
     end 

     workerThreads.push(nextWorker) 
    end # inner while 

    workerThreads.delete_if{|th| not th.alive?} # clean up dead threads 
    end # outer while 

    workerThreads.each{|th| th.join} # join any remaining workers 
end # asyncDispatcher 

그리고 나는 이런 식으로 호출 :

asyncDispatcher(2, (1..10).to_a) {|x| x + 1} 

에 여기 숨어 버그 나 동시성 함정이 있습니까? 아니면이 작업을 단순화하는 런타임의 무언가입니까?

+1

celluloid.io를보십시오. 주 루프가 너무 바빠서 기다리는 것처럼 보입니다. –

+0

++는 바쁜 대기 중입니다. : 'workerThreads.length 7stud

+0

전에 작업자 스레드가 자고 작업 스레드가 완료되면 잠에서 깨어나지만이 접근법은 많은 코너 케이스를 가져야하고 더 많은 복잡한 잠금이 필요했습니다. –

답변

2

큐 사용, 그 출력에서 ​​

require 'thread' 

def asyncDispatcher(numWorkers, stateArray, &processor) 
    q = Queue.new 
    threads = [] 

    (1..numWorkers).each do |worker_id| 
    threads << Thread.new(processor, worker_id) do |processor, worker_id| 
     while true 
     next_state = q.shift  #shift() blocks if q is empty, which is the case now 
     break if next_state == q #Some sentinel that won't appear in your data 
     processor.call(next_state, worker_id) 
     end 
    end 
    end 

    stateArray.each {|state| q.push state} 
    stateArray.each {q.push q}  #Some sentinel that won't appear in your data 

    threads.each(&:join) 
end 


asyncDispatcher(2, (1..10).to_a) do |state, worker_id| 
    time = sleep(Random.rand 10) #How long it took to process state 
    puts "#{state} is finished being processed: worker ##{worker_id} took #{time} secs." 
end 

--output:-- 
2 is finished being processed: worker #1 took 4 secs. 
3 is finished being processed: worker #1 took 1 secs. 
1 is finished being processed: worker #2 took 7 secs. 
5 is finished being processed: worker #2 took 1 secs. 
6 is finished being processed: worker #2 took 4 secs. 
7 is finished being processed: worker #2 took 1 secs. 
4 is finished being processed: worker #1 took 8 secs. 
8 is finished being processed: worker #2 took 1 secs. 
10 is finished being processed: worker #2 took 3 secs. 
9 is finished being processed: worker #1 took 9 secs. 

좋아, 좋아, 사람이가는 모양을하고 부르짖

이봐, # 2했다 네 일을 13 개의 초 총 # 1 은 8 초 밖에 걸리지 않았습니다. 작업에 대해서는 8 초 동안 # 1의 출력이 필요합니다. 직장이 일찍 왔어 야합니다. 루비에서 스레드 전환이 없습니다! 루비가 고장 입니다! ".

음, # 1 5 초 총 처음이 작업에 자고있는 동안, # 2 그래서, 같은 시간에 2 번을 자고 있었다 단지 2 초 이상은 떠났다 # 1이 처음 두 작업을 마쳤을 때 잠을 자려고 했으므로 # 2의 7 초를 2 초로 교체하면 # 1이 첫 번째 두 작업을 마친 후 # 2 작업이 총 4 초 동안 총 8 초 걸렸음을 알 수 있습니다 8 초 작업을 위해 # 1을 묶어 놓은 연속 작업.

+0

대신 서재를 사용하면 대기열이 비어 있으면 휴식 할 수 있습니까? –

+1

지금 내 생각에 결코 신경 쓰지 마라 : 당신은 다음과 같이 구분할 수있는 방법이 필요하다 : '더 기다려야 할 일이 더있다.'그리고 '모든 일이 끝났다.' –

관련 문제