2017-02-21 4 views
4

다음 예제 코드에서는 완료 가능한 미래 집합의 완료 작업으로 100 밀리 동안 잠을자는 biconsumer을 삽입하고 있습니다. 나는 executorService을 사용하여 whenCompleteAsync 메서드를 사용했습니다. executorService 만 6 출력이 인쇄되는 심지어 내가 100 개 선물을 완료거야하지만, 코어 풀 크기 5, 최대 크기 (5) 내가 코드를 실행 1.CompletableFuture의 whenCompleteAsync 호출에서 Throw 된 RejectedExecutionException을 어떻게 캡처 할 수 있습니까?

public class CompleteTest { 
    public static void main(String[] args) { 
     ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10, 
       TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); 

     ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 

     for (int i = 0; i <100; i++) { 
      CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
      stringCompletableFuture.whenCompleteAsync((e, a) -> { 
       System.out.println("Complete " + e); 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e1) {e1.printStackTrace();} 
      }, executorService); 

      list.add(stringCompletableFuture); 
     } 

     for (int i = 0; i < list.size(); i++) { 
      list.get(i).complete(i + ""); 
     } 
    } 
} 

의 큐의 길이가 ThreadPoolExecutor입니다. 이것은 5 개의 코어 스레드와 1 개의 대기열 스레드입니다. 나머지는 어떻게됩니까? 큐가 이미 꽉 차서 다른 실행 파일을 실행 프로그램에 제출할 수 없으면 예외가 없어야합니다.

출력

Complete 0 
Complete 1 
Complete 2 
Complete 3 
Complete 4 
Complete 5 

답변

5

것은 예외가 발생되며, CompletableFuture은 그냥 모든 사람들의 당신이 추적하고, 예외적으로 완료됩니다.

단순히 RejectedExecutionHandler을 사용하는 생성자를 사용하여 ThreadPoolExecutor을 인스턴스화하고 초기화하면 예외가 발생합니다. ExecutorService이 작업을 수락 할 수없는 경우 RejectedExecutionException이 발생합니다. 그렇다면 태스크가 추가되고 어디에서 예외가 발생합니까?

모든 연쇄는 whenCompleteAsync 안에 있습니다. 전화를 걸면 수신자 CompletableFuture, stringCompletableFuture에 종속 항목을 추가합니다. stringCompletableFuture이 완료되면 (이 경우 성공적으로) CompletableFuture (결과로 반환되는)이 새로 만들어지고 주어진 ExecutorService에 주어진 BiConsumer을 예약하려고 시도합니다.

ExecutorService의 대기열에 공백이 없으므로 RejectedExecutionHandler을 호출하여 RejectedExecutionException을 가져옵니다. 이 예외는 그 시간에 캡처되어 completeExceptionallyCompletableFuture에 반환됩니다.

즉, for 루프에서는 CompletableFuturewhenCompleteAsync에 의해 반환되고이를 저장하고 그 상태를 인쇄합니다.

ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>(); 
for (int i = 0; i <100; i++) { 
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> { 
     System.out.println("Complete " + e); 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e1) {e1.printStackTrace();} 
    }, executorService); 
    dependents.add(thisWillHaveException); 
    list.add(stringCompletableFuture); 
} 

for (int i = 0; i < list.size(); i++) { 
    list.get(i).complete(i + ""); 
} 
Thread.sleep(2000); 
dependents.forEach(cf -> { 
    cf.whenComplete((r, e) -> { 
     if (e != null) 
      System.out.println(cf + " " + e.getMessage()); 
    }); 
}); 

당신은 그들이 (성공적으로 이전에 인쇄 된 6 제외)의 모든 것을 알 수는 RejectedExecutionException와 매우 완료.

... 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
관련 문제