2014-11-14 3 views
2

나는 다음과 같은 코드를 가지고있다. 각 제작자에 대해 하나의 대기열. 소비자는 모든 대기열을 반복하며 소비 할 작업이 있는지 확인합니다. 그가 검사중인 대기열에 작업이 있으면 소비합니다. 그렇지 않으면, 그는 모든 대기열에 대해 반복 작업을 완료 할 때까지 다음 대기열을 확인합니다.자바 다중 큐 생산자 소비자

지금까지 모든 대기열을 반복하여 모두 비 었으면 그는 그 중 하나에 무언가를 포함하기를 기다리는 대신 루프를 계속합니다 (바깥 쪽 while 참조).

대기열 중 하나에 뭔가가있을 때까지 소비자를 대기 상태로 만들 수 있습니까?

다음 시나리오에 문제가 있습니다. 대기열이 2 개 밖에 없다고합시다. 소비자가 첫 번째 것을 확인했고 그것은 비어있었습니다. 두 번째 것 (비어 있음)을 확인하는 것처럼 프로듀서는 첫 번째 대기열에 무엇인가 넣습니다. 고객이 염려하는 한 대기열은 모두 비어 있으므로 기다려야합니다 (그 중 하나가 더 이상 비어 있지 않고 계속 반복해야 함).

편집 : 마지막으로 한 가지. 이것은 나를위한 운동입니다. 나 자신을 동기화 구현하려고 해요. 그래서 자바 라이브러리 중 하나가 이것을 구현하는 솔루션을 가지고 있다면 나는 그것에 관심이 없다. 나는 이것을 어떻게 구현할 수 있는지 이해하려고 노력하고있다.

+0

추가 신호/통지를하기 위해 생산자를 수정할 수있는 경우가 아니면 다음과 같은 두 가지 유형의 솔루션이 있습니다. 1) 시간 초과를 사용하여 CPU 소비와 응답 간의 균형을 찾거나 2) 대기열에서 대기중인 추가 스레드를 사용하고 소비자에게 알린다. 두 번째 접근법의 추가 스레드는 메모리를 많이 소비하지만 CPU를 기다리는 시간은 거의 없습니다. – Holger

답변

1

@Abe 가까이이었다. 나는 신호를 사용하고 기다릴 것이다 - 가장 가벼운 무게이기 때문에 Object 클래스 내장 함수를 사용한다.

Object sync = new Object(); // Can use an existing object if there's an appropriate one 

// On submit to queue 
synchronized (sync) { 
    queue.add(...); // Must be inside to avoid a race condition 
    sync.notifyAll(); 
} 

// On check for work in queue 
synchronized (sync) { 
    item = null; 
    while (item == null) { 
     // Need to check all of the queues - if there will be a large number, this will be slow, 
     // and slow critical sections (synchronized blocks) are very bad for performance 
     item = getNextQueueItem(); 
     if (item == null) { 
      sync.wait(); 
     } 
    } 
} 

sync.wait이 해제 통지까지 동기화에 대한 잠금 - 동기화에 잠금이 성공적으로 (그것이 중요 섹션의 일부 유형은 정말이 필요합니다 프로그래머에 대한 알림의 wait 메소드를 호출 할 필요가있다).

필자는 가능한 경우 생산자 전용 큐가 아닌 소비자 (또는 소비자 그룹) 전용 큐를 권장합니다. 솔루션을 단순화합니다.

0

여러 대기열에 대해 차단하려면 한 가지 옵션은 java의 Lock and Condition objects and then use the signal method을 사용하는 것입니다.

따라서 제작자가 데이터를 가지고있을 때마다 signallAll을 호출해야합니다.

Lock fileLock = new ReentrantLock(); 
Condition condition = fileLock.newCondition(); 
... 
// producer has to signal 
condition.signalAll(); 
... 
// consumer has to await. 
condition.await(); 

이 방법은 신호가 제공 될 때만 소비자가 가서 대기열을 확인합니다.

0

나는 @Abe가 제안한 것과 유사한 상황을 해결했지만 SemaphoreAtomicBoolean과 결합하여 사용하고 BinarySemaphore라고 불렀다. 생산자가해야할 일이있을 때 신호를 보내도록 생산자를 수정해야합니다. BinarySemaphore에 대한 코드 및 소비자 업무 루프가 어떻게 보일지의 일반적인 생각 아래
는 :

import java.util.concurrent.Semaphore; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 

public class MultipleProdOneConsumer { 

BinarySemaphore workAvailable = new BinarySemaphore(); 

class Consumer { 

    volatile boolean stop; 

    void loop() { 

     while (!stop) { 
      doWork(); 
      if (!workAvailable.tryAcquire()) { 
       // waiting for work 
       try { 
        workAvailable.acquire(); 
       } catch (InterruptedException e) { 
        if (!stop) { 
         // log error 
        } 
       } 
      } 
     } 
    } 

    void doWork() {} 

    void stopWork() { 
     stop = true; 
     workAvailable.release(); 
    } 
} 

class Producer { 

    /* Must be called after work is added to the queue/made available. */ 
    void signalSomethingToDo() { 
     workAvailable.release(); 
    } 
} 

class BinarySemaphore { 

    private final AtomicBoolean havePermit = new AtomicBoolean(); 
    private final Semaphore sync; 

    public BinarySemaphore() { 
     this(false); 
    } 

    public BinarySemaphore(boolean fair) { 
     sync = new Semaphore(0, fair); 
    } 

    public boolean release() { 

     boolean released = havePermit.compareAndSet(false, true); 
     if (released) { 
      sync.release(); 
     } 
     return released; 
    } 

    public boolean tryAcquire() { 

     boolean acquired = sync.tryAcquire(); 
     if (acquired) { 
      havePermit.set(false); 
     } 
     return acquired; 
    } 

    public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException { 

     boolean acquired = sync.tryAcquire(timeout, tunit); 
     if (acquired) { 
      havePermit.set(false); 
     } 
     return acquired; 
    } 

    public void acquire() throws InterruptedException { 

     sync.acquire(); 
     havePermit.set(false); 
    } 

    public void acquireUninterruptibly() { 

     sync.acquireUninterruptibly(); 
     havePermit.set(false); 
    } 

} 

} 
관련 문제