2011-03-10 4 views
5

CompletionService를 사용하여 몇 가지 Future tasks를 제출합니다. FixedThreadPool ExecutorService 2 라운드를 포장 한 다음 제출 된 작업 수와 동일한 루프를 설정하고 completionservice를 사용합니다. take() 그들 모두가 완료되거나 실패 할 때까지 기다린다. 문제는 아주 가끔씩 끝나지 않습니다. (하지만 그 이유는 모르겠습니다.) 그래서 take() 메소드를 설문 (300, Timeout.SECONDS)으로 변경했습니다. 하나의 태스크가 완료되는 데 5 분 이상이 소요된다면 아이디어가됩니다. 설문 조사는 실패 할 것이고 결국에는 루프에서 벗어나서 모든 미래를 거쳐 future.cancel (true)로 전화하여 문제가되는 작업을 취소 할 수 있습니다.CompletionService를 사용하여 너무 오래 걸리는 작업을 취소하는 방법

그러나 코드를 실행하면 코드가 멈추고 폴링이 5 분마다 한 번씩 계속 실행되고 더 이상 작업이 실행되지 않아 두 작업자가 어떤 방식 으로든 교착 상태에 빠졌다고 가정하고 추가 작업을 허용하지 않습니다. 시작한다. 타임 아웃이 5 분이고 루프를 돌리는 데 걸린 시간을 실행하는 데 여전히 1000 개의 작업이 있었기 때문에 작업이 너무 길어 취소되었습니다.

5 분 안에 hasnt가 완료되면 interupt/force cancellation이 현재 작업을 수행하지만 어떤 일도 할 수 없습니다.

이 코드 샘플 임, 당신의 호출 가능 중단을 지원하는 통화 차단에 대한

import com.jthink.jaikoz.exception.JaikozException; 
import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println("Worker TimedOut:"); 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && result.get()) 
         { 
          System.out.println("Worker Completed:"); 
         } 
         else 
         { 
          System.out.println("Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println("Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
    { 
     if(number==3) 
     { 
      try 
      { 
       Thread.sleep(50000); 
      } 
      catch(InterruptedException tie) 
      { 

      } 
     } 
     return true; 
    } 
} 

출력 귀하의 작업자 예에서

Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker TimedOut: 
Done 
+0

@ user294896 - 작고 독립적 인 예제로 몇 가지 샘플 코드를 제공 할 수 있습니까? – justkt

+0

@justkt 조금만 걸릴 수도 있습니다 –

+0

왜 작업 자체가 너무 오래 걸리고 중단된다는 것을 깨닫지 못합니까? 이것은 크게 단순화합니다. – trojanfoe

답변

4

시간 제한이 발생하면 기본적으로 해결할 수 있다고 생각합니다. 다음 개체 목록을 반복하여 완료하지 않은 첫 번째 개체를 찾아 강제 취소합니다. 그 우아한 것 같지 않지만 작동하는 것 같습니다.

풀 크기가 변경되어 솔루션을 더 잘 보여주는 출력을 보여 주지만 2 개의 스레드 풀에서도 작동합니다.

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       System.out.println("Invocation:"+t); 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println(new Date()+":Worker Timedout:"); 
        //So lets cancel the first futures we find that havent completed 
        for(Future future:futures) 
        { 
         System.out.println("Checking future"); 
         if(future.isDone()) 
         { 
          continue; 
         } 
         else 
         { 
          future.cancel(true); 
          System.out.println("Cancelled"); 
          break; 
         } 
        } 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && !result.isCancelled() && result.get()) 
         { 
          System.out.println(new Date()+":Worker Completed:"); 
         } 
         else if(result.isDone() && !result.isCancelled() && !result.get()) 
         { 
          System.out.println(new Date()+":Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(System.out); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println(new Date()+":Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
     throws InterruptedException 
    { 
     try 
     { 
      if(number==3) 
      { 
       Thread.sleep(50000); 
      } 
     } 
     catch(InterruptedException ie) 
     { 
      System.out.println("Worker Interuppted"); 
      throw ie; 
     } 
     return true; 
    } 
} 

출력 당신은 항상 다음 future.cancel()를 호출 할 수 있습니다 ... 아직 완료되지 않은 경우 future.get(timeout...)
그것은 시간 제한 예외를 반환합니다 호출 할 수 있습니다

Invocation:0 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:1 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:2 
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout: 
Checking future 
Checking future 
Checking future 
Cancelled 
Invocation:3 
Worker Interuppted 
Invocation:4 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Invocation:5 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Thu Mar 10 20:51:49 GMT 2011:Done 
2

이야기 무엇의 단순화 된 버전을 보여줍니다. 실수 코드가 내장 잠금 (synchronized 블록)에서 교착 상태 인 경우 중단을 통해 취소 할 수 없습니다. 대신 명시 적 잠금 (java.util.concurrent.Lock)을 사용하면 잠금 획득을 기다리는 시간을 지정할 수 있습니다. 스레드가 교착 상태를 겪었 기 때문에 스레드가 잠금 대기 중 시간이 초과되면 오류 메시지와 함께 중단 될 수 있습니다.

귀하의 예에서 귀하의 CallableInterruptedException을 삼켜서는 안됩니다. 그것을 전달하거나 (다시 던지거나 메소드 선언의 throws 행에 InterruptedException을 추가해야 함) catch 블록에서 스레드의 중단 된 상태를 재설정하십시오 (Thread.currentThread().interrupt()).

+0

작업자 코드는 코드를 실행할 수 있도록 실제 작업자를 나타내는 코드가 아닙니다. 문제는 CompletionService/ExecutorService와 관련이 있습니다. 문제가있는 서비스를 취소하려면 어떻게해야합니까? 미래에 어떤 서비스가 실행 중인지 알 수 없을 때 문제가 있습니다. –

+0

ps : 코드의 전략적 지점에서 Thread.currentThread(). isInterrupted()도 확인해야합니다. InterruptedException는, 일반적으로, InterruptedException를 서포트하고 있지 않는 한, throw되지 않습니다. isInterrupted() 메소드를 사용하여 인터럽트가 발생했는지 확인하고, 태스크와 정리 후에 중지하십시오. – Matt

1

입니다.

관련 문제