2015-01-12 2 views
1

저는 스칼라의 Future와 Akka에 대해 매우 새롭습니다. 현재 독립적 인 작업 목록을 실행하고 함께 결과를 수집하는 애플리케이션을 구현하려고합니다.Future의리스트를 병렬로 실행

예를 들어, 여러 개의 작업으로 구성된 응용 프로그램을 만들고 싶습니다. 각 응용 프로그램은 숫자를 받고 몇 초 동안 잠자기 후 "Hello"메시지를 반환합니다. 주요 객체로 구현됩니다

class HelloActor extends Actor { 
    def receive = { 
    case name:Int => { 
     println("%s will sleep for %s seconds".format(name, name % 4)) 
     Thread.sleep(name % 4 * 1000) 
     sender ! "Hello %s".format(name) 
    } 
    } 
} 

:

배우가 다음과 같이 구현되는 각 작업은 0, 1, 2, 3 초 동안 잠을 하겠지만

object HelloAkka extends App { 
    val system = ActorSystem("HelloSystem") 

    val helloActor = system.actorOf(Props[HelloActor], name = "helloactor") 

    implicit val timeout = Timeout(20, TimeUnit.SECONDS) 

    val futures = (1 to 10).map(num => { 
    helloActor ? num 
    }) 

    val future = Future.sequence(futures) 

    val results = Await.result(future, timeout.duration) 

    println(results) 

    system.shutdown 
} 

, 나는 기대 짧은 수면을 먼저 수행해야하는 작업. 그러나 결과는 다음과 같습니다.

1 will sleep for 1 seconds 
2 will sleep for 2 seconds 
3 will sleep for 3 seconds 
4 will sleep for 0 seconds 
5 will sleep for 1 seconds 
6 will sleep for 2 seconds 
7 will sleep for 3 seconds 
8 will sleep for 0 seconds 
9 will sleep for 1 seconds 
10 will sleep for 2 seconds 
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10) 

즉 모든 작업이 순서대로 실행됩니다. 대신 모든 작업을 병렬로 실행하는 방법이 있는지 궁금합니다.

+5

모든 요청을 동일한 배우에게 보냅니다. 같은 액터 쌍으로부터 /로 메시지가 순차적으로 실행되도록 보장됩니다. 10 개의 서로 다른 배우 사본을 보내거나 배우 내부의 수면 주위에서 미래 운영자를 이동할 수 있습니다. –

+0

@DiegoMartinoia 고마워요, 여러 배우를 사용할 때 작동합니다. 당신이 그것을 표시 할 수 있도록 답변으로 의견을 게시 할 수 있습니까? –

+0

완료, 인내심에 감사드립니다! –

답변

3

코멘트에서 언급했듯이 모든 작업/메시지를 한 명의 액터로 보내고 있으며이 모든 작업/메시지가 순서대로 처리된다는 것을 보장합니다.

작업을 병렬 처리하려면 HelloActor의 경우 처리기 액터의 인스턴스가 여러 개 있어야합니다.

물론 HelloActor의 인스턴스를 여러 개 만들 수는 있지만 이는 좋은 습관이 아닙니다.

이러한 종류의 작업을 위해서는 빌드 - 인 라우팅 기능을 사용해야합니다.이 기능을 사용하면 작업자/처리기 풀을 관리하고 하나의 router 액터를 통해 작업자/처리기 풀을 상호 작용할 수 있습니다.

val router: ActorRef = 
    context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router") 

... 
router ? num 
... 

자세한 내용은 Akka Routing 문서를 참조하십시오.

0

의견과 대답에서 제안한 것처럼 여러 배우를 시작하는 대신 Future에서 실제 작업을 실행하는 것이 좋습니다. 따라서 배우는 작업의 코디네이터와 같을 것입니다. 예컨대는 :

//...  

// import pipe pattern to get access to `pipeTo` method 
import akka.pattern.pipe 
import scala.concurrent.Future 

// the `Future`s will be executed on this dispatcher 
// depending on your needs, you may want to create a 
// dedicated executor for this 
class TaskCoordinatorActor extends Actor { 
    import context.dispatcher 

    def receive = { 
    case name: Int => 
     Future { 
     Thread.sleep(name % 4 * 1000) 
     "Hello %s".format(name) 
     } pipeTo sender() 
    } 
} 

위의 코드는 scala.concurrent.Future 및 파이프에서 원래 보낸 사람에게 그 결과를 귀하의 작업을 실행합니다. 이 방법으로 액터는 작업이 완료 될 때까지 차단하지 않지만 Future이 생성되면 다음 메시지를받을 준비가됩니다.

P .: 일반 정수를 보내는 대신 액터에서 수행 할 작업을 명시 적으로 나타내는 메시지 유형을 만들어야합니다. 예를 들면 다음과 같습니다.

case class Sleep(duration: Duration) 
0

동일한 액터에서 동일한 액터로 보낸 메시지는 순서대로 실행됩니다.

두 가지 옵션이 있습니다.(메모리로 가고, 잘못된 가져 오기를 할 수있다) 그들은 모두 병렬로 실행되도록

하나, 각 메시지의 HelloActor의 새 복사본을 만들거나 다음과 같이 뭔가 당신의 HelloActor을 수정

import akka.pattern.pipe._ 

class HelloActor extends Actor { 
    def receive = { 
    case name:Int => { 
     println("%s will sleep for %s seconds".format(name, name % 4)) 
     Future(sleepAndRespond(name)) pipeTo sender 
    } 
} 

def sleepAndRespond(name:String) = { 
    Thread.sleep(name % 4 * 1000) 
    "Hello %s".format(innerName) 
} 
} 

이렇게하면 순차 부분은 미래의 파이프 일 뿐이며, 10 개의 메시지 각각에 대해 비동기 적으로 실행됩니다.

+0

라우터를 사용하는 것에 대한 @drexin의 제안은 다중 응답 방식을 사용하는 경우 매우 좋은 방법입니다. 또한 두 경우 모두 스레드 풀/디스패처 구성 방법과 성능이 엄격하게 연결되어 있습니다. 자세한 내용은 설명서를 확인하십시오! –