2011-01-31 2 views
1

생산자 - 소비자 환경에서 BlockingQueue<Runnable> (ScheduledThreadPoolExecutor에서 가져옴)이 있습니다. 큐에 작업을 추가하는 하나의 스레드와 스레드를 실행하는 스레드 풀이 있습니다. isEmpty() 변경시 알림 대기열

  • 마지막 항목이 데이터베이스에 메시지를 작성 =
  • 알림 큐에서 제거

    1. 첫 번째 항목이 비어있는 큐에 추가 :

      나는이 이벤트에 대한 통지를해야합니다.

      구현할 수있는 합리적인 방법이 있습니까?

    답변

    1

    간단하고 순진한 방법은 BlockingQueue를 기본 큐를 검사 한 다음 알림을 수행하는 작업을 게시하는 구현으로 꾸미는 것입니다.

    NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> { 
        private final Notifier notifier; // injected not null 
    
        … 
    
        @Override public void put(T element) { 
        if (getDelegate().isEmpty()) { 
         notifier.notEmptyAnymore(); 
        } 
        super.put(element); 
        } 
    
        @Override public T poll() { 
        final T result = super.poll(); 
        if ((result != null) && getDelegate().isEmpty()) 
         notifier.nowEmpty(); 
        } 
        … etc 
    } 
    

    이 방법에도 몇 가지 문제가 있습니다. empty -> notEmpty는 매우 직설적이지만 특히 단일 생산자의 경우 두 소비자가 동시에 실행하는 것이 쉬우 며 대기열이 비어 있지 않음 → 비어 있음을 볼 수 있습니다.

    하지만, 당신이 원하는 모든이, 다음이, 한 당신의 통지가 귀하의 상태 기계로 충분 추적합니다 큐가 시간에서 빈되었다고 통보하는 공허함과 비 공허함과 통지를 하나에서 다른 변경 :

    이 지금 집행자에 아마 게시물의 작업을 비동기 적으로 데이터베이스에 이벤트를 기록 할 수있는 실제 알리미 구현 주위에 thread 세이프 가드
    AtomicStateNotifier implements Notifier { 
        private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty 
        private final Notifier delegate; // injected not null 
    
        public void notEmptyAnymore() { 
        if (empty.get() && empty.compareAndSet(true, false)) 
         delegate.notEmptyAnymore(); 
        } 
    
        public void nowEmpty() { 
        if (!empty.get() && empty.compareAndSet(false, true)) 
         delegate.nowEmpty(); 
        } 
    } 
    

    .

    0

    디자인에 결함이있을 수 있지만 비교적 간단하게 처리 할 수 ​​있습니다. 하나의 스레드가 추가되어 추가하기 전에 확인할 수 있습니다. 즉 pool.getQueue().isEmpty() - 생산자 한 명당, 이는 안전합니다.

    마지막으로 제거 된 항목은 보장 할 수 없지만 beforeExecute을 무시하고 다시 대기열을 확인할 수 있습니다. 가능하면 isEmpty() 이후에 작은 시간 초과가 발생하면 true를 반환합니다. 아마도 아래 코드는 afterExecute에서 대신 실행하는 것이 좋습니다. 수영장에서 실행되는 당신이 작업을 추가 상상, 작업이 즉시 예정 : 언젠가 그

    알림을하고 왜 큐/자체가 잘 작동하지 않습니다 승 내가 설명 할 수와 같은

    protected void beforeExecute(Thread t, Runnable r) { 
        if (getQueue().isEmpty()){ 
        try{ 
         Runnable r = getQueue().poll(200, TimeUnit.MILLISECONDS); 
         if (r!=null){ 
         execute(r); 
         } else{ 
         //last message - or on after execute by Setting a threadLocal and check it there 
         //alternatively you may need to do so ONLY in after execute, depending on your needs 
        } 
        }catch(InterruptedException _ie){ 
         Thread.currentThread().interrupt(); 
        } 
        } 
    } 
    

    대기열이 다시 비어 있고 알림이 필요합니다.