2016-12-21 3 views
1

ThreadExecutorPool 안에 사용자 지정 blockingqueue를 사용하고 있지만 작업 근로자가 작업을 수행하지 않고 dispissher 스레드가 새 작업을 큐에 넣지 않는 경우가 있습니다.사용자 지정 LinkedBlockingQueue 교착 상태

다음 사용자 지정 차단 대기열 구현으로 인해 교착 상태가 발생합니다. 이 코드에 문제가 있습니까? add()take() 방법에 대해서는 synchronized 블록이 더 낫습니다.

import java.util.Collection; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.log4j.Logger; 

import com.ttech.utils.alarm.Alarm; 
import com.ttech.utils.alarm.AlarmInterface; 
import com.ttech.utils.counter.Counter; 
import com.ttech.utils.counter.SNMPAgent; 

public class WorkerQueue<E> extends LinkedBlockingQueue<E> { 

    private static final long serialVersionUID = 1L; 

    public Integer lowThreshold; 

    public Integer highThreshold; 

    public Integer capacity; 

    public String name; 

    public String type; 

    public Counter counter = null; 

    public boolean writeAlarmLog; 

    public static final Logger logger = Logger.getLogger(WorkerQueue.class); 

    public static Alarm HighThresholdAlarm = null; 
    public static Alarm CapacityAlarm = null; 

    // Check the size here and clear capacity and high threshold alarms in case 
    public E take() throws InterruptedException { 
     E data = super.take(); 
     counter.setNewValue(super.size()); 
     if (super.size() == lowThreshold) {    
      if(!this.writeAlarmLog) { 
       HighThresholdAlarm.clear(name); 
       CapacityAlarm.clear(name); 
      } else { 
       HighThresholdAlarm.clearLog(name, "Queue High Threshold"); 
       CapacityAlarm.clearLog(name, "Queue Capacity Overload"); 
      } 
     } 
     return data; 
    } 

    public E poll() { 
     E data = super.poll(); 
     counter.setNewValue(super.size()); 
     if (super.size() == lowThreshold) { 
      if(!this.writeAlarmLog) { 
       HighThresholdAlarm.clear(name); 
       CapacityAlarm.clear(name); 
      } else { 
       HighThresholdAlarm.clearLog(name, "Queue High Threshold"); 
       CapacityAlarm.clearLog(name, "Queue Capacity Overload"); 
      } 
     } 
     return data; 
    } 


    public int drainTo(Collection<? super E> c, int maxElements){ 
     int size = super.drainTo(c,maxElements);  
     counter.setNewValue(super.size());  
     return size; 
    } 

    // During adding the data to queue check capacity and high threshold raise alarm in case 
    public boolean add(E data) { 
     Boolean rc = true; 

     if (capacity > 0) { 
      if (this.size() >= capacity) { 
       logger.error("Queue " + name + " is over capacity"); 
       if(!this.writeAlarmLog) 
        CapacityAlarm.raise(name); 
       else 
        CapacityAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue Capacity Overload"); 
       return false; 
      } 
     } 

     if (!super.add(data)) { 
      logger.error("Cannot add data to queue:" + name); 
      rc = false; 
     } else { 
      counter.setNewValue(super.size()); 
     } 

     if (highThreshold == super.size()) { 


      if(!this.writeAlarmLog) 
       HighThresholdAlarm.raise(name); 
      else 
       HighThresholdAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue High Threshold"); 
     } 

     return rc; 
    } 
} 
+0

예 클래스 클래스 멤버 (변수 및 개체)가 있기 때문에 스레드 안전을 위해'synchronized' 메서드가 필요하다고 생각합니다. AFAIK 스레드로부터 안전하지 않습니다. – Yazan

+1

내 조언은 동시성 문제로 이어지기 때문에 동시 데이터 구조를 하위 클래스 화하지 않는 것입니다. 또한 알람을 발생시키는 원인이되는 큐를 만드는 것이 예상되는 범위를 벗어난 것으로 보입니다 (단지 데이터 구조라는 것을 기억하십시오). Teppic이 제안했듯이'ThreadPoolExecutor'를 서브 클래스 화하고'beforeExecute'와'afterExecute'를 사용하여 목표를 달성해야합니다. – Spotted

+0

참고 : 모든 '공개'속성은 정말 무섭다! 이 수업을 즐겁게 지키십시오. ;-) – Spotted

답변

2

ThreadPoolExecutor는 작업 큐에하지 add 작업을 수행합니다. 그것은 offers이고 허용되지 않으면 구성된 RejectedExecutionHandler으로 전달됩니다. 기본적으로이 값은 abort policy handler이며 RejectedExecutionException이 발생합니다.

사용자 지정 큐의 add 메서드는 절대로 호출되지 않습니다.

기내 작업 수의 변경 사항을 추적하려면 실행 프로그램 자체의 beforeExecute 또는 afterExecute 메서드를 재정의하는 것이 좋습니다. 활성 작업 수는 getActiveCount에서 얻을 수 있습니다.

+0

고마워, 나는 또한 제안() 메서드를 재정의했습니다. 그것은 작동합니다. 이러한 재정의 된 메소드에 대해 동기화 된 블록을 추가해야합니까? –