2009-09-16 2 views
5

를 지정하면 (거부 아무런 작업을 의미 없음) 작업에 대한 모든 항상 적어도 5 스레드를해야합니다 집행 인을 만드는 방법, 20 개 스레드의 최대, 그리고 무제한의 큐ThreadPoolExecutor에 문제

내가 새로운 ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) 을 시도 있는가 내가 대기열에 대해 생각하는 모든 가능성 : 원하는대로

new LinkedBlockingQueue() // never runs more than 5 threads 
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new SynchronousQueue() // no tasks can wait, after 20, they are rejected 

아무도이 일을하지 않습니다.

+0

무엇이 작동하지 않았 음을 의미합니까? 예외? – Gandalf

+4

너무 가깝습니다 : 간달프와 사룬! – akf

+0

엔트가 여기에 있어야합니다. . . 그 성가신 독수리는 어디에 있습니까 ... – Gandalf

답변

5

어쩌면 이런 식으로 뭔가를 할 수 있을까요? 나는 그것을 채찍질했다. 그래서 그것에 찌르다. 기본적으로,이 공급하는 데 사용되는 오버 플로우 스레드 풀을 구현하는 기본 ThreadPoolExecutor

나는 그것으로 볼 두 가지 주요 무승부의 백업이 있습니다

  • submit()에 반환 미래 오브젝트의 부족. 그러나 아마도 그것은 당신에게 문제가되지 않습니다.
  • 작업이 제출 될 때 보조 대기열은 ThreadPoolExecutor에만 비어 있습니다. 우아한 솔루션이 있어야하지만, 아직 보지 못했습니다. StusMagicExecutor에 대신 작업이 진행될 것이라는 것을 안다면 문제가되지 않을 수도 있습니다. ("5 월"이 핵심 단어입니다.) 완료된 후 제출 한 작업을 StusMagicExecutor에 찌르는 옵션이있을 수 있습니까?

스투의 매직 집행 인 :

public class StusMagicExecutor extends ThreadPoolExecutor { 
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE. 

    public StusMagicExecutor() { 
     super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler()); 
    } 
    public void queueRejectedTask(Runnable task) { 
     try { 
      secondaryQueue.put(task); 
     } catch (InterruptedException e) { 
      // do something 
     } 
    } 
    public Future submit(Runnable newTask) { 
     //drain secondary queue as rejection handler populates it 
     Collection<Runnable> tasks = new ArrayList<Runnable>(); 
     secondaryQueue.drainTo(tasks); 

     tasks.add(newTask); 

     for (Runnable task : tasks) 
      super.submit(task); 

     return null; //does not return a future! 
    } 
} 

class RejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 
     ((StusMagicExecutor)executor).queueRejectedTask(runnable); 
    } 
} 
+1

위대한, tnx, 이것은 내가 원했던 거의 것이다. 완료 후 하나의 태스크를 poking하는 runnable로 newTasks를 래핑하면, 2 차 큐에 태스크가있는 동안 20 개 중 하나의 스레드 만 유휴 상태가 될 수 있습니다. – Sarmun

1

ThreadPoolExecutor에 대한 javadocs는 일단 스레드가 생성 된 후 corePoolSize 스레드가 생성되면 큐가 가득 차게되면 새로운 스레드가 생성된다는 것을 분명히 알 수 있습니다. 따라서 core을 5로 설정하고 max을 20으로 설정하면 원하는 동작을 얻지 못합니다.

그러나 coremax을 모두 20으로 설정하면 20 개의 스레드가 모두 사용 중이면 작업이 대기열에 추가됩니다. 물론, 이것은 20 개의 모든 것이 살아있을 때까지 (어쨌든 유휴 상태가 될 때까지) "의미없는"5 스레드 최소 요구 사항을 약간 의미가 없게 만듭니다.

+0

"모든 코어 스레드가 죽을 수 있다고 말하면"나가는 "Idle Out"은 결코 일어나지 않을 것입니다. 어떤 방법 으로든 올바르게 사용할 수없는 경우 코어 크기와 최대 크기가 모두 표시되지 않습니다. 내 요구 사항을 충족하는 다른 클래스 (ThreadPoolExecutor 제외)가 있습니까? – Sarmun

1

나는이 문제가 클래스의 단점과 생성자 매개 변수 조합 주어 매우 잘못된 생각합니다. 다음은 SwingWorker의 내부 ThreadPoolExecutor에서 가져온 솔루션으로 최상위 클래스로 만들었습니다. 최소값은 없지만 적어도 상한값은 사용합니다. 내가 모르는 유일한 것은 잠금 실행에서 얻을 수있는 성능입니다.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { 
    private final ReentrantLock pauseLock = new ReentrantLock(); 
    private final Condition unpaused = pauseLock.newCondition(); 
    private boolean isPaused = false; 
    private final ReentrantLock executeLock = new ReentrantLock(); 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       handler); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, 
      RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory, handler); 
    } 

    @Override 
    public void execute(Runnable command) { 
     executeLock.lock(); 
     try { 
      pauseLock.lock(); 
      try { 
       isPaused = true; 
      } finally { 
       pauseLock.unlock(); 
      } 
      setCorePoolSize(getMaximumPoolSize()); 
      super.execute(command); 
      setCorePoolSize(0); 
      pauseLock.lock(); 
      try { 
       isPaused = false; 
       unpaused.signalAll(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } finally { 
      executeLock.unlock(); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     pauseLock.lock(); 
     try { 
      while (isPaused) { 
       unpaused.await(); 
      } 
     } catch (InterruptedException ignore) { 

     } finally { 
      pauseLock.unlock(); 
     } 
    } 
}