2012-08-30 3 views
5

A 유형의 메시지 10 개가 게시 된 경우 서로 다른 유형의 메시지를 이벤트 스트림에 게시해야합니다. 예를 들어 메시지의 우선 순위가 달라야합니다. 결국 B 유형의 하나의 메시지가 게시되고 B의 우선 순위는 B의 우선 순위보다 높습니다. 대기열에 유형 A의 메시지가 10 개 있더라도 메시지 B는 다음 액터에서 을 선택해야합니다.Akka :: ActorSystem의 이벤트 스트림에 우선 순위가 다른 메시지 사용

나는 우선 순위 메시지 here에 대해 읽고 해당 사서함의 내 간단한 구현 만들었습니다

class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

    PriorityGenerator { 
     case ServerPermanentlyDead => println("Priority:0"); 0 
     case ServerDead => println("Priority:1"); 1 
     case _ => println("Default priority"); 10 
    } 

) 

그때 내가 application.conf

akka { 

    actor { 

     prio-dispatcher { 
      type = "Dispatcher" 
      mailbox-type = "mailbox.PrioritizedMailbox" 
     } 

    } 

} 

에 구성을 내 배우로 유선 :

private val myActor = actors.actorOf(
    Props[MyEventHandler[T]]. 
    withRouter(RoundRobinRouter(HIVE)). 
    withDispatcher("akka.actor.prio-dispatcher"). 
    withCreator(
    new Creator[Actor] { 
     def create() = new MyEventHandler(storage) 
    }), name = "eventHandler") 

저는 ActorSystem.eventStream.publish를 사용하고 있습니다. 메시지를 보내려면 주문 번호 을 구독해야합니다 (메시지가 처리되지만 로그 순서는 FIFO 주문에서 볼 수 있습니다).

로그/콘솔에서 "기본 우선 순위"와 같은 메시지를 보지 못했기 때문에 충분하지 않은 것처럼 보입니다. 내가 여기서 뭔가를 놓치고 있니? 설명 된 접근 방식은 이벤트 스트림에서 작동하거나 직접 호출을 통해 액터에서 메시지를 보냅니 까? 그리고 eventStream으로 우선 순위가 지정된 메시지를 얻으려면 어떻게해야합니까?

답변

10

귀하의 문제는 배우가 매우 빨라서 대기열에 들어가기 전에 메시지가 처리되므로 사서함에서 어떤 사전 처리 작업도 수행 할 수 없다는 것입니다.

trait Foo 
    case object X extends Foo 
    case object Y extends Foo 
    case object Z extends Foo 

    class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config) 
extends UnboundedPriorityMailbox( 
    PriorityGenerator { 
     case X ⇒ 0 
     case Y ⇒ 1 
     case Z ⇒ 2 
     case _ ⇒ 10 
    }) 

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString( 
     """ prio-dispatcher { 
     type = "Dispatcher" 
      mailbox-type = "%s" 
     }""".format(classOf[PrioritizedMailbox].getName))) 
     val latch = new java.util.concurrent.CountDownLatch(1) 
     val a = s.actorOf(Props(new akka.actor.Actor { 
     latch.await // Just wait here so that the messages are queued up 
inside the mailbox 
     def receive = { 
      case any ⇒ /*println("Processing: " + any);*/ sender ! any 
     } 
     }).withDispatcher("prio-dispatcher")) 
     implicit val sender = testActor 
     a ! "pig" 
     a ! Y 
     a ! Z 
     a ! Y 
     a ! X 
     a ! Z 
     a ! X 
     a ! "dog" 

     latch.countDown() 

     Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) } 
     s.shutdown() 

이 시험 비행 색상

를 함께 전달합니다 아래의 예는 점을 증명
관련 문제