2017-10-31 3 views
0

다음 코드를 확장하여 액터 요청 요청에 대해 알 수없는 액티비티로 작동하도록했습니다.알 수없는 액터의 처리 결과가 단일 타임 아웃

implicit val timeout = Timeout(100 millis) 
val sendRequestActor = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber)) 
val sendRequestActor2 = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber)) 
val a1 = ask(sendRequestActor, Request).fallbackTo(Future.successful(RequestTimeout)) 
val a2 = ask(sendRequestActor2, Request).fallbackTo(Future.successful(RequestTimeout)) 
val result = for { 
    r1 <- a1 
    r2 <- a2 
} yield(r1, r2) 

val r = Await.result(result, 100 millis) 
r match { 
    case (b: SuccessResponse, b2: SuccessResponse) => { 
    //Process Results 
    } 
    case (b: SuccessResponse, b2: RequestTimeout) => { 
    //Process Results 
    } 
    case (b: RequestTimeout, b2: SuccessResponse) => { 
    //Process Results 
    } 
    case (b: RequestTimeout, b2: RequestTimeout) => { 
    //Process Results 
    } 
    case _ => {} 
} 

(이전 데이터베이스 호출에서받은) 수신자 목록에 요청을 보내려고합니다. 수신자 수는이 기능이 호출 될 때마다 달라집니다. 수신자는 요청을 시간 초과하고 RequestTimeout을 기록하기 전에 응답하는 데 최대 100 밀리 초가 걸립니다. 수신자가 응답하는 경우 SendRequest 배우는 SuccessResponse으로 회신합니다. 나는 val result for-loop를 목록을 처리하도록 변경해야한다고 가정하고 있지만, 모든 액터가 반환하거나 타임 아웃이 안타 될 때까지 최소한의 시간을 기다릴 수 있도록 모든 것을 구조화하는 방법을 잘 모르겠습니다. 낮음). 예제와 같이 하나의 반환 값으로 모든 것을 필요로하지는 않습니다. 각 반복마다 결과 목록과 일치하는 형식의 목록이 좋습니다.

도움이 되셨다면 다른 정보를 제공해 줄 수 있으면 알려 주시기 바랍니다.

콜링 클래스 :

case object GetResponses 

def main(args: Array[String]) { 

val route = { 
    get { 
    complete { 
     //stuff 
     val req_list = List(req1,req2,req3) 
     val createRequestActor = system.actorOf(Props(new SendAll(req_list)), "Get_Response_Actor_" + getActorNumber) 
     val request_future = ask(createRequestActor, GetResponses).mapTo[List[Any]] 
     Thread.sleep(1000) 
     println(request_future) 
     //more stuff 
    } 
    } 
} 


Http().bindAndHandle(route, "localhost", 8080) 
} 

보내기 업데이트 된 클래스 :

class SendAll(requests: List[Request]) extends Actor { 
    import context.{become,dispatcher} 
    var numProcessed = 0 
    var results: List[Any] = List() 
    requests.foreach(self ! _) 

    implicit val timeout = Timeout(100 millis) 
    def receive = { 

    case r: RequestMsg => 
     val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request_".concat(getActorNumber)) 
     (sendRequestActor ? Request).pipeTo(self) 

    case s: SuccessResponse => 
     println("Got Success") 
     results = results :+ s 
     println(results.size + " == " + requests.size) 
     if(results.size == requests.size) { 
     println("Before done") 
     become(done) 
     } 

    case akka.actor.Status.Failure(f) => 
     println("Got Failed") 
     results = results :+ RequestTimeout 
     if(results.size == requests.size) { 
     become(done) 
     } 

    case m => 
     println("Got Other") 

    } 

    def done: Receive = { 
    case GetResponses => 
     println("Done") 
     sender ! results 
    case _ => { 
     println("Done as well") 
    } 
    } 
} 

출력

Got Success 
1 == 3 
Got Success 
2 == 3 
Got Success 
3 == 3 
Before done 
Future(<not completed>) 
,691

편집 주셔서 감사합니다

답변

1

요청자 목록을 액터에 전달한 다음 대신에 자식 액터의 응답을 self으로 pipe 전달합니다. 당신은 요청의 모든 목록은 배우를 인스턴스화 할

class Handler(requests: List[RequestMsg]) extends Actor { 
    import context.{become, dispatcher} 
    var numProcessed = 0 
    var results: List[Any] = List() 
    requests.foreach(self ! _) 

    implicit val timeout = Timeout(100.millis) 

    def receive = { 
    case r: RequestMsg => 
     val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request".concat(getActorNumber)) 
     (sendRequestActor ? Request).pipeTo(self) 

    case s: SuccessResponse => 
     println(s"response: $s") 
     results = results :+ s 
     if (results.size == requests.size) 
     become(done) 

    case akka.actor.Status.Failure(f) => 
     println("a request failed or timed out") 
     results = results :+ RequestTimeout 
     if (results.size == requests.size) 
     become(done) 

    case m => 
     println(s"Unhandled message received while processing requests: $m") 
     sender ! NotDone 
    } 

    def done: Receive = { 
    case GetResponses => 
     println("sending responses") 
     sender ! results 
    } 
} 

: 예를 들어이 예에서

val requests1 = List(RequestMsg("one"), RequestMsg("two"), RequestMsg("three")) 
val handler1 = system.actorOf(Props(new Handler(requests1))) 

- 배우가 responsibility--의 구별, 제한 영역을 가져야한다는 원칙에 따라 배우는 단순히 요청과 응답을 조정합니다. 수집 된 응답에 대해 처리를 수행하지 않습니다. 반응을 얻고 처리하기 위해 다른 배우가이 액터에 GetResponses 메시지를 보낼 것입니다 (또는이 액터는 결과를 처리 액터로 사전에 전송합니다).

+0

이것은 내가 원하는 것을 수행하는 것처럼 보입니다. 하지만'done' 함수에 문제가 있습니다. 전혀 호출되지 않는 것 같습니다 (잠자기 타이머 일 필요는없는 것 같습니다). 이전에는이 ​​기능을 호출하는 다른 배우가 있었으며 처리가 이루어지는 위치에있었습니다. OP를 업데이트하여 나의 호출 클래스와 업데이트 된 전송 클래스를 포함 시켰습니다.내가 완성 된 수업에서 인쇄 된 것이 전혀 보이지 않는 이유는 물론 내 미래가 완성되지 않았는가? – Eumcoz

+1

@Eumcoz : 업데이트에 관한 몇 가지주의 사항 : (1)'RequestHandler'는'Actor'를 확장하고'system.actorOf' ('system.actorOf'는 메인 프로그램에서 호출되어야합니다. 배우가 아닌). (2) 아마도 요청 - 응답 처리가 끝나기에 충분한 시간을 할당하지 못했을 것입니다. 'Thread.sleep'을 늘리거나'Future'에서'onComplete' 콜백을 사용하십시오. – chunjef

+0

죄송합니다. 내 잘못으로, 어떤 이유로 다른 곳에서 전화를 걸고있는 것 같아서, 제 실제 프로그램 인'RequestHandler'가 제게 메인이되어 akka-http 핸들러입니다. 'SendAll'은 http 요청으로부터 Directive로부터 호출됩니다. 내가 얼마나 오래자는 지 상관없이 실패한다. SendAll 호출 타임 아웃은 다음과 같다. '실패 (akka.pattern.AskTimeoutException : [[akka : // HTTP_Actor_System/user/Get_Response_Actor_1 # -720534280] 10000 ms] Sender [null]은 "GetResponses $"유형의 메시지를 보냈습니다.) 이전 테스트에서 응답을 처리하는 데 약 5 ms가 걸렸습니다. – Eumcoz

0

가장 간단한 해결책은 모든 배우 추천을 List에 넣고 List[Future]으로 매핑하고 Future[List]을 얻으려면 Future.sequence을 사용합니다.

val route = { 
    get { 
    val listActorRefs = List(actorRef1, actorRef2, ...) 
    val futureListResponses = Future.sequence(listActorRefs.map(_ ? Request)) 
    onComplete(futureListResponses) { 
     case Success(listResponse) => ... 
     complete(...) 
     case Failure(exception) => ... 
    } 
    } 
} 

더 나은 솔루션 자체가 기다리고 반환 중지에 대한 모든 메시지를 보내드립니다 일부 ResponseCollector 배우를 준비 묻습니다 (나는 BroadcastPool 보는 것이 좋습니다) 및 일정 하나의 메시지를 '배우를 많이 피입니다 결과.