1

나는 다음과 같은 코드가 있습니다컨텍스트 스레드에서 CompletableFuture # whenComplete 실행을 방지하는 방법

ConcurrentHashMap taskMap= new ConcurrentHashMap(); 
.... 
taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     future.whenComplete((r, e) -> taskMap.remove(key, future));    
     return future; 
    }); 

future 이미 완료 whenComplete 함수 인수의 경우 compute를 발동과 같은 스레드에서 호출하는 코드의 문제를. 이 메서드의 본문에서는 맵에서 항목을 제거합니다. 그러나이 메서드 및 응용 프로그램의 고정을 금지하는 메서드 설명서를 계산하십시오.

어떻게이 문제를 해결할 수 있습니까?

답변

0

가장 확실한 해결책은 whenComplete 대신 whenCompleteAsync 대신에 Executor을 사용하여 작업을 실행하는 것이 가장 안전합니다. 어떤 일찍 완료 상당한 가능성이있는 경우 그래서 그냥

taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor); 
     return future; 
    }); 

를 사용할 수

*** with already completed 
job scheduled 
job scheduled 
created in Thread[main,5,main] 
processed in Thread[Thread-0,5,main] 
consumed in Thread[Thread-1,5,main] 
*** with async 
job scheduled 
job scheduled 
job scheduled 
created in Thread[Thread-2,5,main] 
processed in Thread[Thread-3,5,main] 
consumed in Thread[Thread-4,5,main] 

뭔가를 인쇄 할

Executor ex = r -> { System.out.println("job scheduled"); new Thread(r).start(); }; 
for(int run = 0; run<2; run++) { 
    boolean completed = run==0; 
    System.out.println("*** "+(completed? "with already completed": "with async")); 
    CompletableFuture<String> source = completed? 
     CompletableFuture.completedFuture("created in "+Thread.currentThread()): 
     CompletableFuture.supplyAsync(() -> { 
      LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); 
      return "created in "+Thread.currentThread(); 
     }, ex); 

    source.thenApplyAsync(s -> s+"\nprocessed in "+Thread.currentThread(), ex) 
      .whenCompleteAsync((s,t) -> { 
       if(t!=null) t.printStackTrace(); else System.out.println(s); 
       System.out.println("consumed in "+Thread.currentThread()); 
      }, ex) 
      .join(); 
} 

으로 입증 할 수있다, 당신은 사용하여 오버 헤드를 줄일 수

taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     if(future.isDone()) future = null; 
     else future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor); 
     return future; 
    }); 

아마도 이미 다른 작업에서 완료가 발생하더라도 종속 작업이 항상 풀에 대한 새 작업으로 예약되는 것을 좋아하지 않으므로이 명백한 해결 방법을 사용하지 않았을 수 있습니다. 당신은 필요한 작업을 다시 예약하는 전문 집행자와 함께이 문제를 해결할 수 :

Executor inPlace = Runnable::run; 
Thread forbidden = Thread.currentThread(); 
Executor forceBackground 
     = r -> (Thread.currentThread()==forbidden? poolExecutor: inPlace).execute(r); 

… 

future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), forceBackground); 

하지만 당신은이 복잡한 당 매핑 정리 논리가 정말 원하는 여부를 재고 할 수있다. 복잡 할뿐만 아니라 상당한 오버 헤드를 유발할 수 있으므로 실행될 때 이미 구식 일 때 실제로 필요하지 않은 정리 작업을 계획 할 수 있습니다.

정리에 수시로 전체지도를

taskMap.values().removeIf(CompletableFuture::isDone); 

을 실행하는 것이 훨씬 간단하고 훨씬 더 효율적으로 될 수 있습니다.

관련 문제