2014-09-10 2 views
1

배치 작업을 수행하는 스칼라 함수가 있습니다. 자체 동기화되도록 설계되었으므로 하나의 시스템 또는 클러스터에서 1 인스턴스 또는 1000 인스턴스를 실행할 수 있으며 외부 미들웨어를 사용하여 동기화됩니다.스칼라 배치 작업을 병렬 처리합니다.

성능면에서는 하나의 JVM이 여러 스레드에서이 기능을 실행하고 싶습니다. (나는 이것을 동일한 JVM에서 RAM을 보존하기 위해 수행한다.) 이상적으로 코드는 다음과 같이 표시되어야합니다.

execInParallel(9, myBatchFunction) // Starts 9 threads and invokes myBatchFunction() in each one 

이 작업을 수행하는 간단한 방법은 무엇입니까?

답변

1

비 - 블로킹 버전 :

import scala.concurrent._ 
import java.util.concurrent._ 
import collection.JavaConverters._ 

def execInParallel[T](numberThreads: Int, body: => T): util.List[Future[T]] = { 
    val javaExecutor = Executors.newFixedThreadPool(numberThreads) // fixed thread pool 
    val collections = Seq.fill(numberThreads) { 
    new Callable[T]() { 
     def call = body 
    } 
    } 
    val futures = javaExecutor.invokeAll(collections.asJavaCollection) // run pool, first convert Seq to java.util.Collection 
    // Here you have to be sure, that all task run 
    import ExecutionContext.Implicits.global 
    concurrent.Future(javaExecutor.shutdown()) // shutdown in new thread 
    futures // return java futures !!! 
} 

val futures = execInParallel(9, Thread.currentThread.getName) 
println("Hurray") 
futures.asScala.foreach(x => println(x.get)) 
+0

코드가 멋지지만, 여기 내 목표는 코드를 가져 오는 것이 아니라 _learn_입니다. 당신이 한 일과 어떻게 작동하는지 설명해 주시겠습니까? – SRobertJames

+0

java.util.concurrent 패키지 또는 RxJava (RxScala는 래퍼)를 확인하십시오. 필요한 모든 것을 찾을 수 있습니다. –

+0

그게 오타입니까? "asJavaColRxScalalection"은 "asJavaCollection"이어야합니다. – SRobertJames

0

이 작동 할 수 있습니다

(1 to 10).toList.par.foreach(myBatchFunction) 

그것은 병렬로 할 때 스칼라 확인할 수 있도록 myBatchFunction 10 번 호출합니다.

+0

10 개의 스레드를 만들지 확실하지 않습니다. 코어 수와 관련이있을 수 있습니다. – samthebest

관련 문제