2014-03-02 1 views
3

람다 표현식은 메모리와 같은 특정 양의 리소스를 소비한다고 가정하고 동시 실행 수 (예 : 람다가 로컬 메모리의 100MB를 일시적으로 소비하고이를 1GB로 제한하려는 경우 10 개의 동시 평가를 허용하지 않습니다).병렬 스트림에서의 동시 평가 (fixedThreadPool과 같은)의 수를 제한하는 최선의 방법은 무엇입니까?

동시 실행의 수를 제한 할 수있는 가장 좋은 방법은 무엇입니까,

IntStream.range(0, numberOfJobs).parallel().foreach(i -> { /*...*/ }); 

, 예를 들어 말을?

참고 : 확실한 옵션은

double jobsPerThread = (double)numberOfJobs/numberOfThreads; 
    IntStream.range(0, numberOfThreads).parallel().forEach(threadIndex -> 
     IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach(i -> { /*...*/ })); 

이 유일한 방법입니다 같은 중첩을 수행하는 것입니다? Tt는 이 아니며 우아한입니다. 사실은 내가 쉽게 할 수있는 CompletableFuture 유틸리티 방법을 사용하여, 사용 사례에 따라,

IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach(i -> { /*...*/ }); 
+1

왜 당신이 공유 고정 스레드 풀을 사용할 수 없습니다 : 다음 예는 2의 고정 병행 한 번 동일한 작업을 실행하는 기본 동작을 사용하여 한 번 사용자 정의 풀을 사용하여이 보여? 평행 한 시내는 무엇을 복잡하게합니까? – Gray

+0

Java 6에서는 공유 된 고정 스레드 풀을 사용했지만 Java 8 코드는 훨씬 간결합니다. 스레드 풀을 사용하면 Future의 ArrayList를 정의하고 Executor에게 작업자를 제출하고 Futures에서 결과를 수집해야합니다. 이것이 주된 동기이지만, 스트림이 더 효율적이라는 인상을 받았습니다 (약 10 % 정도). –

+2

http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream –

답변

2

을 가지고 싶다 :

import static java.util.concurrent.CompletableFuture.runAsync; 

ExecutorService executor = Executors.newFixedThreadPool(10); //max 10 threads 
for (int i = 0; i < numberOfJobs; i++) { 
    runAsync(() -> /* do something with i */, executor); 
} 

//or with a stream: 
IntStream.range(0, numberOfJobs) 
     .forEach(i -> runAsync(() -> /* do something with i */, executor)); 

코드와 가장 큰 차이점은 그 병렬 forEach만이 반환 마지막 작업이 끝난 후 runAsync이 모든 작업이 제출되면 즉시 반환됩니다. 필요한 경우 해당 동작을 변경하는 다양한 방법이 있습니다.

+0

이 방법도 좋습니다. 그러나 완료하려면 (즉, 다른 솔루션과 비교해 볼 때) 모든 스레드를 연결하는 데 필요한 코드 행을 추가해야합니다. ExecutorService로 작업하는 것이 좀 더 길어지는 것은 실제로이 추가 코드입니다. –

+1

예 - 사용 사례에 따라 'thenXXX' 메소드 중 하나를 사용할 수도 있고 모든 스레드에서 결합하는 실행 프로그램을 종료 할 수도 있습니다. 그것이 작동하지 않는다면 아마 미래를 검색해야 할 것이고 그것은 스트림을 사용하지 않는 코드에 매우 가까워 질 것입니다. – assylias

4

Stream은 병렬 작업에 ForkJoinPool을 사용합니다. 기본적으로 그들은 ForkJoinPool.commonPool()을 사용하고 있으며, 이후에는 동시성을 변경할 수 없습니다. 그러나 자신 만의 ForkJoinPool 인스턴스를 사용할 수 있습니다. 자신의 ForkJoinPool 컨텍스트 내에서 스트림 코드를 실행하면이 컨텍스트 풀이 스트림 작업에 사용됩니다.

import java.util.HashSet; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ForkJoinPool; 
import java.util.stream.IntStream; 

public class InterfaceStaticMethod { 
    public static void main(String[] arg) throws Exception { 
     Runnable parallelCode=() -> { 
     HashSet<String> allThreads=new HashSet<>(); 
     IntStream.range(0, 1_000_000).parallel().filter(i->{ 
      allThreads.add(Thread.currentThread().getName()); return false;} 
     ).min(); 
     System.out.println("executed by "+allThreads); 
     }; 
     System.out.println("default behavior: "); 
     parallelCode.run(); 
     System.out.println("specialized pool:"); 
     ForkJoinPool pool=new ForkJoinPool(2); 
     pool.submit(parallelCode).get(); 
    } 
} 
+1

흥미 롭다 - 나는'parallel()'이 항상 자신의 풀 (ForkJoin 공통 풀)을 사용한다고 생각 했겠지만 그렇지 않다. – assylias

+3

'parallel()'에서 사용하는 풀은 지정되지 않았습니다.현재의 동작은 여기에 설명되어 있지만 향후 변경 될 수 있습니다. –

관련 문제