2009-09-18 2 views
3

느린 계산 (N은 상당히 큰 숫자)을 실행해야하며 속도가 느린 계산에는 많은 IO 대기 시간이 걸리기 때문에 M 스레드에서 수행해야합니다. 나는 모든 계산이 성공한 경우에 잘 작동하는 작은 예제를 작성했습니다. 그러나 계산에 실패하면 원하는 계산을 더 이상 처리하지 않는 것이 좋습니다. 각각의 성공적인 계산은 이미 결과를 데이터베이스에 작성 했으므로 어떤 계산이 실패했는지 결정하고 아직 시작되지 않은 계산을 중지해야합니다.프로세스 M Java에서 N 개의 스레드에 대한 계산 속도가 느림

내 접근 방식은 Executors.newFixedThreadPool에 대한 ExecutorService 인터페이스를 사용하는 것입니다. 그러나 계산 결과 중 하나가 실패 (내 예에서는 false를 반환 함)하고 ExecutorService에 제출되었지만 아직 풀에서 스레드가 할당되지 않은 계산을 중지하는 명확한 방법은 없습니다.

깨끗한 방법이 있습니까? 고려해야 할 더 좋은 방법이 있습니까?

import java.util.*; 
import java.util.concurrent.*; 

class Future 
{ 
    static private class MyWorker implements Callable 
    { 
     private Integer item; 
     public MyWorker(Integer item) 
     { 
      this.item = item; 
     } 

     public Boolean call() throws InterruptedException 
     { 
      if (item == 42) 
      { 
       return false; 
      } 
      else 
      { 
       System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName()); 
       Thread.sleep(1000); 
       return true; 
      } 
     } 
    } 

    static int NTHREADS = 2; 

    public static void main(String args[]) 
    { 
     Queue<Integer> numbers = new LinkedList<Integer>();  
     for (int i=1; i<10000; i++) 
     { 
      numbers.add(i); 
     } 

     System.out.println("Starting thread test."); 

     ExecutorService exec = Executors.newFixedThreadPool(NTHREADS); 

     for (Integer i : numbers) 
     { 
      MyWorker my = new MyWorker(i); 
      System.out.println("Submit..." + i.toString()); 
      exec.submit(my); 
      System.out.println("... Done Submit"); 
     } 

     exec.shutdown(); 

     System.out.println("Exiting thread test."); 

    } 
} 

편집 : 여기에 제안의 AFK의 작업을 구현합니다. 여전히 콜백 솔루션을 살펴보고 다른 제안을 기대하십시오. I outline in another answer, 당신은 실패를 통지하고, cancel 제출 된 모든 작업을 할 수 콜백을 사용

Set futures = new HashSet<Future<Boolean>> 
for (Integer i : numbers) 
{ 
    MyWorker my = new MyWorker(i); 
    System.out.println("Submit..." + i.toString()); 
    Future<Boolean> f = exec.submit(my); 
    futures.add(f); 
    System.out.println("... Done Submit"); 
    } 

    for (Future f : futures) { 
    if (!f.get().booleanValue()) { 
     exec.shutdown(); 
    } 
+0

모든 반복에서 수면 상태에서도 신호에 의존하기보다는 끊임없이 주 스레드가 완료를 위해 폴링하는 "통화 대기"상태입니다. 작동할까요? 확실한. 그것은 단지 귀엽지 않다. java.util.concurrent가 제공하는 도구를 사용하면이 비효율적 인 작업이 필요하지 않습니다. – erickson

+0

@erickson : 메인 스레드가 신호를 대기하거나 모든 Future가 완료 될 때까지 기다려야합니까? –

+0

발견 된 ExecutorService.awaitTermination(), 문제가 해결되었습니다. –

답변

2

:

import java.util.*; 
import java.util.concurrent.*; 

class MyFuture 
{ 
    static private class MyWorker implements Callable 
    { 
     private Integer item; 
     public MyWorker(Integer item) 
     { 
      this.item = item; 
     } 

     public Boolean call() 
     { 
      if (item == 42) 
      { 
       return false; 
      } 
      else 
      { 
       System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName()); 
       try 
       { 
        Thread.sleep(1000); 
       } 
       catch (InterruptedException ie) 
       { 
       // Not much to do here except be grumpy they woke us up... 
       } 
       return true; 
      } 
     } 
    } 

    static int NTHREADS = 4; 

    public static void main(String args[]) throws InterruptedException 
    { 
     Queue<Integer> numbers = new LinkedList<Integer>();  
     for (int i=1; i<100; i++) 
     { 
      numbers.add(i); 
     } 

     System.out.println("Starting thread test."); 

     ExecutorService exec = Executors.newFixedThreadPool(NTHREADS); 

     List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>(); 

     for (Integer i : numbers) 
     { 
      MyWorker my = new MyWorker(i); 
      System.out.println("Submit..." + i.toString()); 
      Future<Boolean> f = exec.submit(my); 
      futures.add(f); 
      System.out.println("... Done Submit"); 
     } 

     boolean done = false; 

     while (!done) 
     { 
      Iterator<Future<Boolean>> it = futures.iterator(); 

      while (it.hasNext()) 
      { 
       Future<Boolean> f = it.next(); 
       if (f.isDone()) 
       { 
        try 
        { 
         System.out.println("CHECK RETURN VALUE"); 
         if (f.get()) 
         { 
          it.remove(); 
         } 
         else 
         {     
          System.out.println("IMMEDIATE SHUTDOWN"); 
          exec.shutdownNow(); 
          done = true; 
          break; 
         } 
        } 
        catch (InterruptedException ie) 
        { 
        } 
        catch (ExecutionException ee) 
        { 
        } 
       } 
      } 
      Thread.sleep(1000); 
      if (futures.size() == 0) 
      { 
       done = true; 
      } 
     } 

     exec.shutdown(); 

     System.out.println("Exiting thread test."); 

    } 
} 
+0

콜백이 우아 해 보입니다. 나는 그것을 시험해 볼 것이지만, 그것은 아침을 기다려야 할 것이다 ... –

+0

나는이 작업의 구현을 가지고 있지만, 마지막 작업이 제출되자 마자 메인 스레드가 종료된다.제출 된 모든 작업이 완료되거나 콜백이 실패를 나타낼 때까지 스레드가 차단하는 우아한 방법이 있습니까? –

+0

발견 된 ExecutorService.awaitTermination(), 문제가 해결되었습니다. –

0
class Stopper 
{ 
    boolean stopped = false; 
    ExecutorService exec; 

    public void stop() { if (!stopped) { stopped = true; exec.shutdown(); } } 
} 

static private class MyWorker implements Callable 
{ 
    private Integer item; 
    private Stopper stopper; 
    public MyWorker(Integer item, Stopper stopper) 
    { 
      this.stopper = stopper; 
      this.item = item; 
    } 

    public Boolean call() throws InterruptedException 
    { 
      if (item == 42) 
      { 
        stopper.stop(); 
        return false; 
      } 
      else 
      { 
        System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName()); 
        Thread.sleep(1000); 
        return true; 
      } 
    }  
} 
+0

그건 내 생각 중 하나에 가깝습니다. 스레드가 실행 컨테이너에 대한 지식을 필요로한다는 점에 대해 어떤 버그가 있습니까? 그래도 문제가 해결 될 것입니다. –

+1

정지 된 플래그는 다중 스레드에 의해 액세스되므로 "volatile"이어야합니다. 그러나 이미 종료 된 호출 이후에도 이미 제출 된 모든 작업이 여전히 실행되므로 문제가됩니다. – erickson

1

당신은 Callable 프레임 워크의 Future면을 통합 할 수 있습니다. (예를 들어, Callback 구현 클래스는 각 Future이 추가 된 Collection에 대한 참조를 가질 수 있습니다.) 완료된 (또는 인수의 값에 따라 시작된) 태스크의 경우 아무 것도 수행하지 않습니다. 나머지는 절대로 시작되지 않습니다.

+0

세트의 마지막 작업이 가장 먼저 실행되고 빠르게 실패하면 어떻게됩니까? 나머지 (불필요한) 작업이 완료 될 때까지는 발견되지 않을 것입니다. 왜냐하면 결과를'get '하는 호출이 차단되기 때문입니다. – erickson

+0

좋은 지적. 이를 해결하기 위해 future는 pub/sub 대기열에 넣고 별도의 스레드에서 테스트 할 수 있습니다. – akf

+0

나는 이것을 테스트 할 때 몇 가지 구현 세부 사항을 변경했지만이 기본 접근법을 좋아한다. Set은 정의되지 않은 순서로 요소를 반환하기 때문에 LinkedList를 사용했습니다. 아직 스레드가 할당되지 않은 Future 인스턴스를 반환하는 경우가 많습니다. 또한 get()을 호출하기 전에 isDone()을 확인하여 대기를 피하십시오 (Set Point를 약간 음소거로 만든다). shutdown() 대신 shutdownNow()를 호출합니다. 그렇지 않으면 제출 된 모든 계산이 여전히 완료되기 때문입니다. 코드를 내 질문에 편집으로 게시했습니다. –

관련 문제