2013-05-29 5 views
23

akka가 메시지 전달을 보장하지 않는 이유와 방법에 대한 몇 가지 게시물을 읽었습니다. documentation,이 discussion 및 그룹에 대한 다른 토론에서는 잘 설명합니다.오른쪽 디자인 in akka. - 메시지 전달

나는 akka에 아주 익숙하며 케이스에 적합한 디자인을 알고 싶습니다. 예를 들어 서로 다른 기계에 3 명의 다른 배우가 있다고 가정 해 보겠습니다. 하나는 요리 책을, 다른 하나는 역사를, 다른 하나는 기술 도서를 담당합니다.

다른 컴퓨터에 주인공이 있습니다. 우리가 사용할 수있는 책이 있는지 검색하기 위해 주 배우에게 질의가 있다고 가정합니다. 메인 액터는 3 명의 원격 액터에게 요청을 보내고 그 결과를 기대합니다. 그래서 나는 이렇게한다 :

val scatter = system.actorOf(
     Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
       routees=someRoutees, within = 10 seconds)), "router") 
    implicit val timeout = Timeout(10 seconds) 
    val futureResult = scatter ? Text("Concurrency in Practice") 
    //  What should I do here?. 
    //val result = Await.result(futureResult, timeout.duration) line(a) 

요컨대, 나는 3 명의 모든 원격 액터에게 요청을 보내고 10 초 내에 결과를 기대한다.

조치는 어떻게해야합니까?

  1. 10 초 후에 결과를 얻지 못한다고 말하면 모두에게 다시 요청해야합니까?
  2. 위의 경우 within 시간이시기 상조입니다. 그러나 나는 얼마나 많은 시간이 걸릴지에 관해 미리 알지 못한다.
  3. within 시간이 충분하지만 메시지가 삭제 된 경우 어떻게됩니까?

within 시간 내에 응답을받지 못하면 요청을 다시 다시 보내십시오. 이런 식으로 뭔가, 그것은 비동기 유지 :

futureResult onComplete{ 
    case Success(i) => println("Result "+i) 
    case Failure(e) => //send again 
} 

그러나 너무 많은 쿼리 아래 못해 통화와 부피가 너무 많은 스레드 수

? line(a)의 주석 처리를 제거하면 동기식으로되어로드 부족으로 인해 성능이 크게 저하 될 수 있습니다.

10 초 후에 응답을받지 못한다고합니다. within 시간이시기 상조라면 무거운 쓸모없는 계산이 다시 발생합니다. 메시지가 끊어지면 10 초의 귀중한 시간이 낭비됩니다. 메시지가 배달되었다는 것을 알았다면 회의 시간이 이 아니고 오랜 기간 동안 기다릴 것입니다.

사람들은 어떻게 이러한 문제를 해결합니까? ACK? 그런 다음 모든 쿼리의 액터에 상태를 저장해야합니다. 그것은 공통점이 있어야하며 올바른 디자인을 찾고 있습니다.

답변

24

나는이 질문 중 일부에 대답하려고합니다. 나는 모든 것에 대해 구체적인 해답을 얻지는 않을 것이지만, 바라건대 올바른 방향으로 인도 할 수 있습니다.

먼저 도서 검색을하는 3 명의 배우에게 요청을 전달하는 방법을 변경해야합니다. ScatterGatherFirstCompletedRouter을 사용하는 것은 올바른 접근 방법이 아닙니다. 이 라우터는 라우트 중 하나 (첫 번째 응답)에 대한 응답을 기다리기 때문에 다른 두 라우트의 결과가 포함되지 않으므로 결과 세트가 불완전합니다. BroadcastRouter도 있지만 tell (!) 만 처리하고 ask (?)이 아닌 사용자 요구에 맞지 않습니다. 원하는 작업을 수행하는 방법 중 하나는 각 수신자에게 요청을 보내어 응답에 대해 Futures을 얻은 다음 Future을 사용하여 Future.sequence을 결합하는 것입니다.

case class SearchBooks(title:String) 
case class Book(id:Long, title:String) 

class BookSearcher extends Actor{ 

    def receive = { 
    case req:SearchBooks => 
     val routees:List[ActorRef] = ...//Lookup routees here 
     implicit val timeout = Timeout(10 seconds) 
     implicit val ec = context.system.dispatcher 

     val futures = routees.map(routee => (routee ? req).mapTo[List[Book]]) 
     val fut = Future.sequence(futures) 

     val caller = sender //Important to not close over sender 
     fut onComplete{ 
     case Success(books) => caller ! books.flatten 

     case Failure(ex) => caller ! Status.Failure(ex) 
     } 
    } 
} 

는 이제 우리의 마지막 코드가 될 수 없습니다,하지만 당신의 샘플이해야 할 시도 있었는지의 근사치입니다 : 단순화 된 예는 다음과 같을 수 있습니다. 이 예에서 다운 스트림 경로 중 하나가 실패하거나 시간 초과되면 우리는 Failure 블록을 공격 할 것이고 호출자는 또한 실패 할 것입니다. 모두 성공하면 호출자는 Book 개체의 집계 목록을 대신 가져옵니다.

질문에 대한 답변입니다. 먼저 타임 아웃 내에있는 경로 중 하나에서 응답을받지 못한 경우 모든 배우에게 다시 요청해야하는지 묻습니다. 이 질문에 대한 답은 정말로 당신에게 달렸습니다. 다른 사용자가 부분 결과 (예 : 3 명의 배우 중 2 명)의 결과를 보도록 허용 하시겠습니까? 아니면 매번 항상 결과의 전체 집합이어야합니까? timesout routees의 일부 또는 어떤 이유로 실패 할 경우, '의 빈 목록을이 코드

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{ 
    case ex => 
    //probably log something here 
    List() 
}) 

: 대답은'예, 당신은 다음과 같이 할 수 routees에 보내는 코드를 조정할 수 도서 '대신 실패로 대응할 예정이다. 이제 부분 결과로 살 수 없다면 전체 요청을 다시 다시 보낼 수 있지만 상대방이 책 결과를 기다리고 누군가를 영원히 기다리지 않으려한다는 것을 기억해야합니다.

두 번째 질문은 타임 아웃이시기 상조인지 여부입니다. 선택한 제한 시간 값은 완전히 당신에게 달려 있지만, 가장 큰 가능성은 두 가지 요소에 근거해야합니다. 첫 번째 요소는 검색 호출 시간 테스트입니다. 얼마나 오래 걸릴지 평균치를 알아 내고 안전을 위해 약간의 방석으로 값을 선택하십시오. 두 번째 요소는 상대방이 누군가 자신의 결과를 기다리는 시간입니다. 당신은 당신의 타임 아웃에서 매우 보수적 일 수 있습니다. 단지 60 초를 안전하게하는 것입니다. 그러나 실제로 다른 쪽에서 결과를 기다리는 누군가가 있다면, 그들은 얼마나 오랫동안 기다릴 의향이 있습니까? 나는 오히려 영원히 기다리지 않고 다시 시도해야한다는 실패 응답을 얻고 싶습니다. 따라서이 두 가지 요인을 고려하여 상대방의 발신자가 너무 오랫동안 기다리지 않도록하면서 응답 시간을 매우 높일 수있는 값을 선택해야합니다.

질문 3의 경우 메시지가 삭제되면 어떻게되는지 묻습니다. 이 경우 필자는 수신자가 응답 할 메시지를받지 않기 때문에 응답을받지 못하기 때문에 해당 메시지를 수신 한 사람의 미래가 시간 초과 될 것이라고 추측합니다. Akka는 JMS가 아닙니다. 수신자가 메시지를 수신하고 응답하지 않으면 메시지가 여러 번 재전송 될 수있는 승인 모드가 없습니다.

제 예제에서 볼 수 있듯이 을 사용하여 Future 총계를 차단하지 않는 것에 동의합니다. 나는 비 블로킹 콜백을 사용하는 것을 선호한다. Actor 인스턴스가 차단 작업이 완료 될 때까지 해당 사서함 처리를 중지하므로 수신 기능에서 차단이 적합하지 않습니다. 비 블로킹 콜백을 사용하면 해당 인스턴스를 해제하여 사서함 처리로 되돌아 가서 결과 처리가 ExecutionContext에서 실행되는 다른 작업 일 수있게하고 해당 사서함을 처리하는 액터와 분리됩니다.

이제 네트워크가 불안정 할 때 실제로 통신을 낭비하지 않으려는 경우, Akka 2.2에서 사용할 수있는 Reliable Proxy을 조사 할 수 있습니다. 이 길을 가고 싶지 않다면, ping 유형의 메시지를 주기적으로 길에 보내서 직접 굴릴 수 있습니다. 응답 시간이 길면 응답을하지 않는 것으로 표시하고 ping이라는 매우 짧은 시간에 신뢰할 수있을 때까지 메시지를 보내지 않습니다. 이는 일종의 FSM과 같습니다.이 동작이 절대적으로 필요하면 작동 할 수 있지만 이러한 솔루션은 복잡성을 더하게되므로 반드시 이러한 동작이 필요하면 고용해야합니다. 은행 소프트웨어를 개발 중이고 나쁜 금융 문제가 발생할 수 있으므로 보장 된 전달 의미론이 절대적으로 필요하다면 꼭 모든 접근 방식을 고려해야합니다. 너가하지 않는 시간의 90 %를 내기 때문에 너가 이것 같이 무언가를 필요로 한 ㄴ다는 것을 결정에서 다만 현명한 있으 십시요. 귀하의 모델에서, 이미 알고있는 어떤 것을 기다리는 것으로 영향을받는 유일한 사람은 상대방의 호출자입니다. 액터에서 논 블로킹 콜백을 사용함으로써, 뭔가 시간이 오래 걸린다는 사실에 의해 중단되지 않습니다; 그것은 이미 다음 메시지로 옮겨졌습니다. 실패하면 다시 제출하기로 결정할 때 조심해야합니다. 수령하는 배우들의 사서함을 넘치고 싶지는 않습니다. 다시 보내기로 결정한 경우 고정 된 횟수만큼 끝내십시오.

이러한 보장 된 종류의 의미가 필요한 경우 가능한 다른 방법은 Akka의 Clustering Model을 살펴 보는 것입니다. 다운 스트림 라우트를 클러스터링하고 서버 중 하나에서 장애가 발생하면 모든 트래픽은 다른 노드가 복구 될 때까지 여전히 있던 노드로 라우팅됩니다.

+0

자세한 답변을 보내 주셔서 감사합니다. 이것은 내 옆에서 약간의 현상금을받을 자격이있다 :). 또한 3 가지 질문 후에 언급 한 다른 질문에 대답 해 주실 수 있습니까? 일부 메시지 드롭 다운이있을 수 있다는 사실을 고려하여 문제를 해결하려고합니다. – Jatin

+0

조금 더 자세한 정보를 추가했지만 여전히 확실하지 않습니다. 귀하의 질문에 100 % 답변했습니다. 그것이 비록 도움이 되길 바래. – cmbaxter

+0

확인. 요약하기 위해 메시지 드롭 다운을 고려하지 말고 '시간 내에'집중해야합니다. 'within'시간보다 오래 걸리는 경우 메시지가 삭제 된 것으로 간주되어 (대부분의 경우) 후속 조치가 취해질 수 있습니다. 유일한 문제는 'within'시간이 큰 경우 (이미지 처리 작업 말하기), 다른 대안을 사용할 수 있거나 'ack'이라고 말할 수있는 경우 일 수 있습니다. 연습에서의 일반적인 질문 : 적절하게 연결되어있는 곳에서 얼마나 자주 메시지가 삭제됩니까? – Jatin

관련 문제