생산자 - 소비자 환경에서 BlockingQueue<Runnable>
(ScheduledThreadPoolExecutor
에서 가져옴)이 있습니다. 큐에 작업을 추가하는 하나의 스레드와 스레드를 실행하는 스레드 풀이 있습니다. isEmpty() 변경시 알림 대기열
알림 큐에서 제거
- 첫 번째 항목이 비어있는 큐에 추가 :
나는이 이벤트에 대한 통지를해야합니다.
구현할 수있는 합리적인 방법이 있습니까?
생산자 - 소비자 환경에서 BlockingQueue<Runnable>
(ScheduledThreadPoolExecutor
에서 가져옴)이 있습니다. 큐에 작업을 추가하는 하나의 스레드와 스레드를 실행하는 스레드 풀이 있습니다. isEmpty() 변경시 알림 대기열
알림 큐에서 제거
나는이 이벤트에 대한 통지를해야합니다.
구현할 수있는 합리적인 방법이 있습니까?
간단하고 순진한 방법은 BlockingQueue를 기본 큐를 검사 한 다음 알림을 수행하는 작업을 게시하는 구현으로 꾸미는 것입니다.
NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> {
private final Notifier notifier; // injected not null
…
@Override public void put(T element) {
if (getDelegate().isEmpty()) {
notifier.notEmptyAnymore();
}
super.put(element);
}
@Override public T poll() {
final T result = super.poll();
if ((result != null) && getDelegate().isEmpty())
notifier.nowEmpty();
}
… etc
}
이 방법에도 몇 가지 문제가 있습니다. empty -> notEmpty는 매우 직설적이지만 특히 단일 생산자의 경우 두 소비자가 동시에 실행하는 것이 쉬우 며 대기열이 비어 있지 않음 → 비어 있음을 볼 수 있습니다.
하지만, 당신이 원하는 모든이, 다음이, 한 당신의 통지가 귀하의 상태 기계로 충분 추적합니다 큐가 시간에서 빈되었다고 통보하는 공허함과 비 공허함과 통지를 하나에서 다른 변경 :
이 지금 집행자에 아마 게시물의 작업을 비동기 적으로 데이터베이스에 이벤트를 기록 할 수있는 실제 알리미 구현 주위에 thread 세이프 가드AtomicStateNotifier implements Notifier {
private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty
private final Notifier delegate; // injected not null
public void notEmptyAnymore() {
if (empty.get() && empty.compareAndSet(true, false))
delegate.notEmptyAnymore();
}
public void nowEmpty() {
if (!empty.get() && empty.compareAndSet(false, true))
delegate.nowEmpty();
}
}
.
디자인에 결함이있을 수 있지만 비교적 간단하게 처리 할 수 있습니다. 하나의 스레드가 추가되어 추가하기 전에 확인할 수 있습니다. 즉 pool.getQueue().isEmpty()
- 생산자 한 명당, 이는 안전합니다.
마지막으로 제거 된 항목은 보장 할 수 없지만 beforeExecute
을 무시하고 다시 대기열을 확인할 수 있습니다. 가능하면 isEmpty()
이후에 작은 시간 초과가 발생하면 true를 반환합니다. 아마도 아래 코드는 afterExecute에서 대신 실행하는 것이 좋습니다. 수영장에서 실행되는 당신이 작업을 추가 상상, 작업이 즉시 예정 : 언젠가 그
알림을하고 왜 큐/자체가 잘 작동하지 않습니다 승 내가 설명 할 수와 같은
protected void beforeExecute(Thread t, Runnable r) {
if (getQueue().isEmpty()){
try{
Runnable r = getQueue().poll(200, TimeUnit.MILLISECONDS);
if (r!=null){
execute(r);
} else{
//last message - or on after execute by Setting a threadLocal and check it there
//alternatively you may need to do so ONLY in after execute, depending on your needs
}
}catch(InterruptedException _ie){
Thread.currentThread().interrupt();
}
}
}
대기열이 다시 비어 있고 알림이 필요합니다.