2009-09-30 2 views
2

교착 상태 :두 BlockingQueue를 - 나는 원자 두 개의 큐를 조작하는 요구 사항이 올바른 동기화 전략이 무엇인지 확실하지 않다

public class transfer { 

    BlockingQueue firstQ; 
    BlockingQueue secondQ; 

    public moveToSecond() { 
     synchronized (this){ 
      Object a = firstQ.take(); 
      secondQ.put(a) 
     } 
    } 

    public moveToFirst() { 
     synchronized(this) { 
      Object a = secondQ.take(); 
      firstQ.put(a); 
     } 
    } 
} 

이 올바른 패턴이 내가하려고했던 무엇인가? moveToSecond() 메서드에서 firstQ가 비어 있으면 메서드는 firstQ.take()에서 대기하지만이 객체에 대한 잠금을 유지합니다. 이렇게하면 moveToFirst()가 실행할 기회가 없습니다.

기다리는 동안 잠금 해제에 대해 혼란스러워합니다. - 스레드가 모든 잠금을 해제합니까? [이 모두와 Blockedqueue 잠금을 해제합니까?]? 다중 블로킹 큐를 다루는 원 자성을 제공하는 올바른 패턴은 무엇입니까?

답변

0

당신은 다음과 같이 java.util.concurrency에서 잠금-메커니즘을 사용해야합니다

Lock lock = new ReentrantLock(); 
.... 
lock.lock(); 
try { 
    secondQ.put(firstQ.take()); 
} finally { 
    lock.unlock(); 
} 

는 firstQ.put에 대해 동일한 작업을 수행 (secondQ.take을()), 동일한 잠금 개체를 사용.

새로운 동시성 프리미티브를 작성하지 않는 한 Object 클래스에서 하위 수준의 wait/notify 메서드를 사용할 필요가 없습니다.

+1

효율상의 이유로 Java 1.5 이상의 대기에서 잠금을 권장했습니다. Java 1.6에서는 거의 동일한 수준의 효율성을 제공합니다. 향후의 VM 최적화로 인해 대기/통지 구현이 더 빨라질 수 있습니다. –

+0

그러나 잠금 장치를 올바르게 사용하는 것이 훨씬 쉽습니다. – JesperE

+0

첫 번째 큐가 비어있는 OP 문제를 어떻게 해결할 수 있습니까? 코드는 여전히 take() 메소드에서 무기한 차단됩니다. – Adamski

0

코드에서 BlockingQueue.take()에 스레드가 차단되어있는 동안 this의 잠금 장치를 누르고 있습니다. 코드가 동기화 된 블록을 벗어나거나 this.wait()이 호출 될 때까지 잠금이 해제되지 않습니다.

여기서는 moveToFirst()moveToSecond()이 차단되어야하며 클래스가 모든 대기열 액세스를 제어한다고 가정합니다.

private final BlockingQueue<Object> firstQ = new LinkedBlockingQueue(); 
private final Semaphore firstSignal = new Semaphore(0); 
private final BlockingQueue<Object> secondQ = LinkedBlockingQueue(); 
private final Semaphore secondSignal = new Semaphore(0); 

private final Object monitor = new Object(); 

public void moveToSecond() { 
    int moved = 0; 
    while (moved == 0) { 

    // bock until someone adds to the queue 
    firstSignal.aquire(); 

    // attempt to move an item from one queue to another atomically 
    synchronized (monitor) { 
     moved = firstQ.drainTo(secondQ, 1); 
    } 
    } 
} 

public void putInFirst(Object object) { 
    firstQ.put(object); 

    // notify any blocking threads that the queue has an item 
    firstSignal.release(); 
} 

당신은 moveToFirst()putInSecond() 비슷한 코드를 가지고있다. while은 다른 코드가 항목을 대기열에서 제거 할 수있는 경우에만 필요합니다. 대기 행렬을 기다리는 대기열을 제거하는 메소드가 세마포어로부터 허가를 받아야하고, 세마포어가 공정한 세마포어로 작성되어야하므로 aquire을 호출하는 첫 번째 스레드가 먼저 해제됩니다.

firstSignal = new Semaphore(0, true); 

당신은 당신이

  1. Executor
  2. 에 보낸 Runnable에서 작업을해야합니까 방법을 가지고 몇 가지 옵션이 차단 moveToFirst()에 타임 아웃을 전달하고를 사용하는 moveToFirst()을하지 않으려면
  3. BlockingQueue.drainTo(secondQ, 1)을 사용하고 moveToFirst()을 수정하여 성공했는지 여부를 나타내는 부울을 반환합니다.

위 세 가지 옵션의 경우 세마포어가 필요하지 않습니다.

마지막으로 나는 이동을 원자 적으로 만들 필요성에 의문을 제기합니다. 여러 스레드가 큐에 추가하거나 큐에서 제거하는 경우 관찰 큐는 moveToFirst()이 원자인지 여부를 알 수 없습니다.

2

공통 뮤텍스를 사용하여 두 큐를 동기화하는 올바른 방법을 사용하고 있습니다.그러나 첫 번째 큐가 비어있는 상황을 피하기 위해 moveToFirst()moveToSecond()을 다시 구현하여 take() 대신 poll()을 사용하는 것이 좋습니다. 예 : 당신은 언급하지 않았다

public void boolean moveToFirst() { 
    // Synchronize on simple mutex; could use a Lock here but probably 
    // not worth the extra dev. effort. 
    synchronzied(queueLock) { 
    boolean success; 

    // Will return immediately, returning null if the queue is empty. 
    Object o = firstQ.poll(); 

    if (o != null) { 
     // Put could block if the queue is full. If you're using a bounded 
     // queue you could use add(Object) instead to avoid any blocking but 
     // you would need to handle the exception somehow. 
     secondQ.put(o); 
     success = true; 
    } else { 
     success = false; 
    } 
    } 

    return success; 
} 
1

또 다른 실패 조건은 firstQ가 비어 있지 않지만 secondQ가 꽉 찬 경우, 항목이 firstQ에서 제거됩니다하지만 넣어 곳이 없을 것입니다.

올바른 방법은 설문 조사를 사용하고 제한 시간과 코드를 제공하여 모든 실패 (중요!) 이전 상태로 되돌리고, 설문 조사와 제안이 모두 성공할 때까지 임의의 시간 후에 다시 시도하는 것입니다.

이것은 낙관적 인 접근 방식입니다. (평균 대기 시간은 선택한 시간 제한에 따라 달라짐)

관련 문제