2012-08-08 3 views
0

큰 작업을 처리하는 JobService가 있습니다. 작업은 여러 작업으로 동적으로 세분화되며, 작업은 하위 작업 등을 생성 할 수도 있으므로 작업의 총 작업 수를 예측할 수 없습니다. 각 작업은 ExecutorService.submit(...)을 통해 실행되도록 대기열에 대기합니다. '작업 대기열'이 완료 될 때를 알 수있는 유일한 방법은 ExecutorService.awaitTermination(...)을 사용하는 것이므로 문제는 각 작업에 대해 별도의 ExecutorService를 만들어야하는 것 같습니다. 이는 작업과 해당 ExecutorService 사이에 I can't share a single threadpool이 있기 때문에 비효율적 인 것으로 보입니다.공유 ExecutorService를 작업 대기열로 사용하면 작업 완료 시점을 어떻게 알 수 있습니까?

일부 대안을 찾고 있는데, 각 작업에 AtomicInteger를 사용하려고 생각했습니다. 새 작업을 제출하면 작업을 증가시키고 작업이 완료되면 작업을 감소시킵니다. 그러나 그때 그것이 0 일 때를 조사해야하고, 그것은 예외 처리 혼란뿐만 아니라 지저분 해 보입니다.

더 나은 해결책이있는 것 같습니다.

+3

['CompletionService'] (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletionService.html)을 검토 했습니까? ['submit'] (http://goo.gl/lmHAP)에 'Job'이 완료 됨과 연결된 값을 사용할 수 있습니다. – oldrinb

+2

샘플 코드는 [ExecutorCompletionService의 javadoc] (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html)을 참조하십시오. – assylias

+0

'CompletionService'를 살펴 봤지만 개별 결과 처리에 신경 쓰지 않아서 원하는 바가 아닌 것 같았습니다. 모두 완료되었을 때를 알고 싶습니다. 그래서 poll()/take() 하나 하나 지저분 해 보였습니다, 나는 단지 completionQueue가 언제 비어 있는지 알고 싶습니다. 어쩌면 나는 그것을 나의 목적을 위해 확장하는 것을보아야한다. – Brian

답변

4

Submit는 작업 완료시 대기하는 데 사용할 수있는 Future 개체를 반환합니다. 이러한 작업을 추적하고 모든 하위 작업이 완료 될 때까지 반복적으로 차단하는 메서드를 추가 할 수 있습니다. 이렇게하면 필요한 곳에서 실행 프로그램을 재사용 할 수 있습니다.

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.atomic.AtomicBoolean; 

public class JobExecutor { 
    ExecutorService executorService = Executors.newFixedThreadPool(1); 

    private class Task implements Runnable { 
     private final String name; 
     private final Task[] subtasks; 
     private final ExecutorService executorService; 
     private volatile boolean started = false; 
     private Future<?> taskFuture; 

     // Separate list from subtasks because this is what you'll probably 
     // actually use as you may not be passing subtasks as constructor args 
     private final List<Task> subtasksToWaitOn = new ArrayList<Task>(); 

     public Task(String name, ExecutorService executorService, 
       Task... subtasks) { 
      this.name = name; 
      this.executorService = executorService; 
      this.subtasks = subtasks; 
     } 

     public synchronized void start() { 
      if (!started) { 
       started = true; 
       taskFuture = executorService.submit(this); 
      } 
     } 

     public synchronized void blockTillDone() { 
      if (started) { 
       try { 
        taskFuture.get(); 
       } catch (InterruptedException e) { 
        // TODO Handle 
       } catch (ExecutionException e) { 
        // TODO Handle 
       } 
       for (Task subtaskToWaitOn : subtasksToWaitOn) { 
        subtaskToWaitOn.blockTillDone(); 
       } 
      } else { 
       // TODO throw exception 
      } 
     } 

     @Override 
     public void run() { 
      for (Task subtask : subtasks) { 
       subtask.start(); 
       subtasksToWaitOn.add(subtask); 
      } 
      System.out.println("My name is: " + name); 
     } 
    } 

    void testSubmit() { 
     Task subsubTask1 = new Task("Subsubtask1", executorService); 
     Task subtask1 = new Task("Subtask1", executorService, subsubTask1); 
     Task subtask2 = new Task("Subtask2", executorService); 
     Task subtask3 = new Task("Subtask3", executorService); 
     Task job = new Task("Job", executorService, subtask1, subtask2, 
       subtask3); 
     job.start(); 
     job.blockTillDone(); 
     System.out.println("Job done!"); 
    } 

    public static void main(String[] args) { 
     new JobExecutor().testSubmit(); 
    } 
} 

인쇄 아웃 :

당신이합니다 (백 포트 라이브러리 http://www.cs.washington.edu/homes/djg/teachingMaterials/grossmanSPAC_forkJoinFramework.html 또는 java6) java7에 경우
My name is: Job 
My name is: Subtask1 
My name is: Subtask2 
My name is: Subtask3 
My name is: Subsubtask1 
Job done! 
+0

감사! 이것은 일종의 방향입니다. 문제는 작업이 동적으로 앞쪽에 추가되지 않으므로 blockTillDone()에서 subtaskToWaitOn 루프에서 ConcurrentModificationExceptions를 가져옵니다. – Brian

+0

이 솔루션은이를 고려했습니다. 부모 작업의 run 메서드가 완료 될 때까지 블록을 완료 할 때까지 차단합니다 (taskFuture.get()). 이 시점에서 모든 하위 작업이 추가되었습니다. 서브 타스크의 동적 추가를 시뮬레이트하기 위해 run 메소드 내에 서브 타스크 ToWaitOn에 실제로 모든 서브 타스크를 추가하고 있습니다. – Bryan

+0

당신 말이 맞았습니다. 문제를 일으키는 여분의 쓰레기가있었습니다. 나는 지금 꽤 잘 작동하고있다. 도와 줘서 고마워!googler를 위해, 나는 아직도 Fork-Join 풀로 가고 싶다고 생각하지만, 프로젝트가 Java7에 도착할 때까지 기다릴 것이다. – Brian

1

, 당신은 이런 종류의에 대한 풀을 포크 - 가입 고려하는 것이 좋습니다 :

class MainTask extends RecursiveTask<Long> { 

    @Override 
    protected Long compute() { 
     SubTask subtask0 = new SubTask(0L); 
     SubTask subtask1 = new SubTask(1L); 
     SubTask subtask2 = new SubTask(2L); 
     SubTask subtask3 = new SubTask(3L); 
     SubTask subtask4 = new SubTask(4L); 
     SubTask subtask5 = new SubTask(5L); 

     subtask1.fork(); 
     subtask2.fork(); 
     subtask3.fork(); 
     subtask4.fork(); 
     subtask5.fork(); 

     return subtask0.compute() + 
       subtask1.join() + 
       subtask2.join() + 
       subtask3.join() + 
       subtask4.join() + 
       subtask5.join(); 
    } 

} 

class SubTask extends RecursiveTask<Long> { 
    private Long rawResult = null; 

    private Long expected = null; 

    public SubTask(long expected) { 
     this.expected = expected; 
    } 

    @Override 
    protected Long compute() { 
     return expected; 
    } 
} 

public static void main(String[] args) 
{ 
    ForkJoinPool forkJoinPool = new ForkJoinPool(); 
    Long result = forkJoinPool.invoke(new MainTask()); 
    System.out.println(result); 
} 

분명히 여기에는 하드 코드 된 하위 작업이 있지만, 주 작업에 매개 변수를 전달할 수없고 그 작업을 사용하여 하위 작업을 생성 할 이유가 없습니다. 하위 작업 자체는 모두 동일한 유형이어야하지는 않지만 모두 RecursiveTask를 확장해야합니다. 현실적으로 태스크가 하위 태스크 (위의 MainTask와 같은)를 생성하는 경우, 적어도 하나의 하위 태스크가 직접 "계산"(즉, 포크 및 조인)을 가져야하므로 현재 스레드가 일부 계산을 실행할 수 있고 다른 스레드는 스레드가 나머지를 처리합니다.

+0

ps : 이런 종류의 일은 ForkJoinPool이 의도 한 바입니다. – Matt

+0

지금 java6가 붙어 있지만 가장 우아하고 정확한 해결책처럼 보입니다. jsr166에 대해 더 자세히 설명하겠습니다. – Brian

관련 문제