2014-12-18 2 views
5

필자는 고안된 코드 예제를 작성했으며 다른 사람이 사용해야하는 코드가 아닐 수도 있지만 이어야한다고 생각합니다. 그러나 대신 교착 상태가됩니다. 나는 here이라고 기술 된 대답을 읽었으나 그것들이 불충분하다는 것을 알았다.thenComposeAsync는 회수 할 반환을 기다리고 있습니다

import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.Executor; 
import java.util.concurrent.Executors; 

public class Test { 

    public static void main(String argv[]) throws Exception { 

     int nThreads = 1; 
     Executor executor = Executors.newFixedThreadPool(nThreads); 



     CompletableFuture.completedFuture(true) 
      .thenComposeAsync((unused)->{ 

       System.err.println("About to enqueue task"); 
       CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); 
       executor.execute(() -> { 

        // pretend this is some really expensive computation done asynchronously 

        System.err.println("Inner task"); 
        innerFuture.complete(true); 
       }); 
       System.err.println("Task enqueued"); 

       return innerFuture; 
      }, executor).get(); 

     System.err.println("All done"); 
     System.exit(0); 

    } 

} 

이 인쇄 : 여기

은 코드 예제입니다 작업을 대기열에

소개 작업이

을 큐에

그리고 다음이 중단됩니다. 집행자는 단 하나의 스레드 만 가지고 있기 때문에 교착 상태에 빠졌으며 innerFuture가 상환 될 때까지 기다리고 있습니다. 왜 "thenComposeAsync"가 여전히 불완전한 미래를 반환하고 executor에서 스레드를 풀지 않고 반환 값을 상환 할 수 없게 차단합니까?

완전히 직관적이지 않으며 javadocs가 도움이되지 않습니다. CompletionStages가 어떻게 작동하는지 근본적으로 오해하고 있습니까? 아니면 구현의 버그입니까?

답변

2

재미있는 대화를 많이 한 후 JDK 작성자 중 한 명에게 이메일을 보내기로했습니다. 이 동작이 의도되지 않았으며 실제로 1.8u25에 버그가 있음을 알았습니다. Java 8의 최신 패치 버전으로 출시 될 수정 사항이 있습니다. 어떤 버전인지는 알 수 없습니다. 새로운 동작을 테스트하고 싶은 누군가를 위해, 당신은 여기에서 최신 jsr166 항아리를 다운로드 할 수 있습니다

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

1

첫째, 나를 쉽게 무슨 일이 일어나고 있는지 볼 수 있도록하기 위해이 개 정적 기능을 가진 코드를 다시 보자

// Make an executor equivalent to Executors.newFixedThreadPool(nThreads) 
// that will trace to standard error when a task begins or ends 
static ExecutorService loggingExecutor(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
      0L, TimeUnit.MILLISECONDS, 
      new LinkedBlockingQueue<>()) { 

       @Override 
       protected void beforeExecute(Thread t, Runnable r) { 
        System.err.println("Executor beginning task on thread: " 
         + t.getName()); 
       } 

       @Override 
       protected void afterExecute(Runnable r, Throwable t) { 
        System.err.println("Executor finishing task on thread: " 
         + Thread.currentThread().getName()); 
       } 

      }; 
} 

그리고

// same as what you pass to thenComposeAsync 
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) { 
    return b -> { 
     System.err.println(Thread.currentThread().getName() 
        + ": About to enqueue task"); 
     CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); 
     executor.execute(() -> { 
      System.err.println(Thread.currentThread().getName() 
        + ": Inner task"); 
      innerFuture.complete(true); 
     }); 
     System.err.println(Thread.currentThread().getName() 
        + ": Task enqueued"); 

     return innerFuture; 
    }; 
} 

을 다음과 같이 이제 우리는 테스트 케이스를 작성할 수 있습니다 :

ExecutorService e = loggingExecutor(1); 

CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e) 
     .join(); 

e.shutdown(); 

/* Output before deadlock: 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
*/ 

결과가 나오기 전까지 첫 번째 스레드가 해제되지 않았다는 결론을 테스트 해 봅시다. econd 미래는 계산 :

ExecutorService e = loggingExecutor(1); 

CompletableFuture<Boolean> future = 
     CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e); 

System.err.println("thenComposeAsync returned"); 

future.join(); 

e.shutdown(); 

/* 
thenComposeAsync returned 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
*/ 

thenComposeAsync하셨습니까 : 실제로

ExecutorService e = loggingExecutor(2); // use 2 threads this time 

CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e) 
     .join(); 

e.shutdown(); 

/* 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
Executor beginning task on thread: pool-1-thread-2 
pool-1-thread-2: Inner task 
Executor finishing task on thread: pool-1-thread-2 
Executor finishing task on thread: pool-1-thread-1 
*/ 

, 스레드 2의 당신이 바로 그 thenComposeAsync 자체를 차단하는 경우 보자

가 완료 될 때까지 스레드 (1)가 개최됩니다 나타납니다 '블록. CompletableFuture이 즉시 반환되었으며 교착 상태는 완료 할 때만 발생했습니다. 따라서 .thenComposeAsync(inner(e), e)에 의해 반환 된 미래를 완료하는 데 무엇이 필요합니까?

  1. API는 CompletableFuture<Boolean>
  2. 그것은 또한 완전한 반환 CompletableFuture<Boolean> 때까지 기다릴 필요가 돌아 innner (예)를 기다릴 필요가있다. 그래야만 미래가 완성됩니다. 보시다시피, 불완전한 미래를 제안하고 돌려 줄 수는 없습니다.

버그입니까? 내부 작업을 계산하는 동안 CompletionStage가 스레드 1을 유지하는 이유는 무엇입니까? 당신이 지적했듯이 문서가 꽤 모호하고 특정 순서로 쓰레드를 릴리즈 할 것을 약속하지 않기 때문에 버그가 아닙니다. 또한 Thread1은 CompletableFuture의 후속 then*() 메서드에 사용됩니다.다음 고려 : 당신이 볼 수 있듯이

ExecutorService e = loggingExecutor(2); 

CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e) 
     .thenRun(() -> System.err.println(Thread.currentThread().getName() 
         + ": All done")) 
     .join(); 

e.shutdown(); 

/* 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
Executor beginning task on thread: pool-1-thread-2 
pool-1-thread-2: Inner task 
Executor finishing task on thread: pool-1-thread-2 
pool-1-thread-1: All done 
Executor finishing task on thread: pool-1-thread-1 
*/ 

, .thenRun (...)가 스레드 1에서 실행되었다 나는이 다른 * 비동기 CompletableFuture의 (..., 집행자 간부) 방법과 일치한다 생각합니다.

그러나 thenComposeAsync의 기능을 스레드를 저글링하기 위해 API로 두지 않고 별도로 제어 할 수있는 2 개의 단계로 나누려면 어떻게해야합니까? 다음과 같이하면됩니다 :

ExecutorService e = loggingExecutor(1); 

completedFuture(true) 
     .thenApplyAsync(inner(e), e) // do the async part first 
     .thenCompose(x -> x)   // compose separately 
     .thenRun(() -> System.err.println(Thread.currentThread().getName() 
         + ": All done")) 
     .join(); 

e.shutdown(); 

교착 상태가없는 1 개의 스레드에서 모든 것이 잘 실행됩니다.

결론적으로이 말은 당신이 말하는 것처럼 직관적이지 않습니까? 나는 모른다. 나는 thenComposeAsync가 왜 존재하는지 상상할 수 없다. 메서드가 CompletableFuture을 반환하면 차단해서는 안되며 비동기 적으로 호출 할 이유가 없어야합니다.

+0

감사 미샤 세부 사항 및 노력에 대한. 당신의 본보기는 나의 것보다 훨씬 뚜렷합니다. 나는 당신의 진술에 대해 머리를 감쌀 필요가있다. 구체적으로 : 1. API는 Innner (e)가 CompletableFuture를 반환 할 때까지 기다려야합니다. 2. 반환 된 CompletableFuture 도 완료 될 때까지 기다려야합니다. 그러면 앞으로 만 완료됩니다. 당신이 제안한 것을 할 수없고 불완전한 미래를 되돌릴 수 없습니다. " 나는 당신이 볼 수 있듯이 정신적 인 도약을 함축하지 않습니다. 나는 이것을 조금 더 생각해 볼 것입니다. – sethwm

관련 문제