2010-01-28 5 views
3

나는 ExecutorService 클래스/인터페이스 시리즈를 좋아합니다. 스레드에 대해 걱정할 필요가 없습니다. ExecutorService 인스턴스를 가져 와서 작업 예약에 사용하고, 8 스레드 또는 16 스레드 풀을 사용하려는 경우, 걱정하지 않아도됩니다. ExecutorService가 어떻게 설정되어 있는지. 만세!java : 다중 스레드/단일 스레드 작업 큐 결합

하지만 일부 작업을 연속적으로 실행해야한다면 어떻게해야합니까? 이상적으로는 ExecutorService에게 단일 스레드에서 이러한 작업을 예약하도록 요청할 것이지만 그렇게하는 방법은없는 것으로 보입니다.

편집 : 가이거 카운터의 예를 클릭하거나 키 입력을하는 작업이 미리 알려져 있지 않다, 그들은 이상하게 다양한 종류의 이벤트 (임의/알 수없는 도착 과정을 생각에 의해 생성되는 작업의 무제한 시리즈는 이벤트).

+0

단일 스레드 이벤트의 목적 상 별도의 단일 스레드 실행 프로그램이 있습니다. 그것은 하나의 스레드를 더 사용할 수도 있음을 의미하지만 그 영향은 그다지 크지 않다고 생각합니다. –

+0

가능한 복제본 : http://stackoverflow.com/questions/2153663/controlling-task-execution-order-with-executorservice – finnw

+0

@finnw, 요점이 있습니다. 동일하지 않은지 확실히 모르겠다. (다중 쓰레드 일 수있는 하나의 ExecutorService를 사용하도록 제한하려고한다.) –

답변

6

Runnable의 구현을 작성하여 일부 작업을 수행하고 순차적으로 실행하도록 할 수 있습니다. 같은

뭔가 : 내가 대기열과 한 번에 하나를 실행하려는 작업에 Executors.newSingleThreadExecutor()로 만든 별도의 집행을 사용하고

public class SerialRunner implements Runnable { 
    private List<Runnable> tasks; 

    public SerialRunner(List<Runnable> tasks) { 
     this.tasks = tasks; 
    } 

    public void run() { 
     for (Runnable task: tasks) { 
      task.run(); 
     } 
    } 
} 
+0

이 작업은 작동하지 않습니다. 작업은 한 번에 하나씩 대기열에 포함되는 무제한의 일련의 작업입니다. –

+0

이 경우, 실행중인 다른 작업과 관련하여 명령의 순서가 중요하지 않은 경우 Executors.newSingleThreadExecutor()를 사용하십시오 – danben

+0

+1 :이 대답은 내가 원한 것이 아니지만 나에게 아이디어를줍니다 ... –

2

. 또 다른 방법은 여전히 ​​myTask2myTask1이 예외가 발생하는 경우에도 실행하려면 좀 더 정교해야 할 수 있지만 단지

executor.submit(new Runnable() { 
    public void run() { 
     myTask1.call(); 
     myTask2.call(); 
     myTask3.call(); 
    }}); 

, 여러 작업을 작성하고 그 중 하나를 제출하는 것입니다.

+0

별도의 실행 프로그램을 사용하면 효과적 일 수 있지만 가능한 경우 피하고 싶은 제약 조건입니다. –

1

내가 수행하는 방식은 작업 자체의 키가 무엇인지에 따라 스트림이 다른 스레드로 작동하도록하는 자체 생성 코드를 사용하는 것입니다 (이것은 완전히 임의적이거나 의미있는 값일 수 있음). Queue에 제공하고 다른 스레드가 작동하지 않고 (또는 귀하의 경우 ExecutorService으로 작업을 처리하고 내부 작업 대기열을 제거하는 스레드 풀을 유지 관리하는 서비스) 대신 Pipelineable (일명 작업)을 PipelineManager에 보내면 해당 작업의 키에 대한 올바른 대기열을 찾고 해당 대기열에 작업을 고정시킵니다. 동일한 키에 대해 제공되는 모든 작업이 연속적으로 실행되도록 보장하기 위해 대기열을 제거하는 스레드를 관리하는 여러 가지 코드가 있습니다.

이 접근법을 사용하면 n 개의 일련 작업에 대한 특정 키를 쉽게 설정하고 이전 순서로 갈 수있는 작업의 나머지 키들을 라운드 로빈 방식으로 처리 할 수 ​​있습니다. 또는 특정 파이프 (스레드)를 현명하게 유지할 수 있습니다 키 선택. 그들은 (IS 적어도 ThreadPoolExecutor) 단일 BlockingQueue 바탕 따라서 오래된 위해이 일을 "말할 수있는 방법은 없습니다 있기 때문에

이 방법은 JDK ExecutorService 구현 가능하지 않지만이 작품은해야합니다 연재 ". 나는 당신이 danben의 코멘트에 따라 singleThreadExecutor에 모든 것을 집어 넣는 것만으로 처리량을 유지하기를 원한다고 가정하고 있습니다.

(편집) 같은 추상화를 유지하는 대신 무엇을 할 수 있는지

은의 많은 인스턴스 대표는 ThreadPoolExecutor (또는 유사한) 당신이 필요로하는 ExecutorService 당신 자신의 구현을 만드는 만드는 것입니다; 1은 n 개의 스레드와 1 개 이상의 단일 스레드 인스턴스에 의해 지원됩니다.다음과 같은 뭔가 (어떠한 방식으로 모든 코드를 작동하는 그러나 희망 당신이 얻을 생각!)

public class PipeliningExecutorService<T extends Pipelineable> implements ExecutorService { 
    private Map<Key, ExecutorService> executors; 
    private ExecutorService generalPurposeExecutor; 

    // ExecutorService methods here, for example 
    @Override 
    public <T> Future<T> submit(Callable<T> task) { 
     Pipelineable pipelineableTask = convertTaskToPipelineable(task); 
     Key taskKey = pipelineable.getKey(); 
     ExecutorService delegatedService = executors.get(taskKey); 
     if (delegatedService == null) delegatedService = generalPurposeExecutor; 
     return delegatedService.submit(task); 
    } 
} 

public interface Pipelineable<K,V> { 
    K getKey(); 
    V getValue(); 
} 

은 서비스 자체 반대로 ExecutorService 방법은 일반적인 것으로,이 목적을 위해, 꽤 추한하는 당신이 할 수 없다면 Pipelineable과 fallback으로 전달되는 것을 마샬링하기위한 표준 방법이 필요하다는 것을 의미합니다 (예 : 범용 풀에 던져 넣음).

0

흠,이게 효과가 있을지는 모르겠지만 뭔가 (아마 테스트되지 않은 코드)가 될지도 모릅니다. 이는 미묘한 부분을 뛰어 넘지 만 (예외 처리, 취소, 기본 Executor의 다른 작업에 대한 공정성 등) 유용 할 수 있습니다.

class SequentialExecutorWrapper implements Runnable 
{ 
    final private ExecutorService executor; 

    // queue of tasks to execute in sequence 
    final private Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); 

    // semaphore for pop() access to the task list 
    final private AtomicBoolean taskInProcess = new AtomicBoolean(false); 

    public void submit(Runnable task) 
    { 
     // add task to the queue, try to run it now 
     taskQueue.offer(task); 
     if (!tryToRunNow()) 
     { 
      // this object is running tasks on another thread 
      // do we need to try again or will the currently-running thread 
      // handle it? (depends on ordering between taskQueue.offer() 
      // and the tryToRunNow(), not sure if there is a problem) 
     } 
    } 

    public void run() 
    { 
     tryToRunNow(); 
    } 

    private boolean tryToRunNow() 
    { 
     if (taskInProcess.compareAndSet(false, true)) 
     { 
      // yay! I own the task queue! 
      try { 
       Runnable task = taskQueue.poll(); 
       while (task != null) 
       { 
        task.run(); 
        task = taskQueue.poll(); 
       } 
      } 
      finally 
      { 
       taskInProcess.set(false); 
      } 
      return true; 
     } 
     else 
     { 
      return false; 
     } 
    }