2016-06-03 3 views
0

내 간단한 Akka Camel 응용 프로그램이 설정되어 잘못된 SEDA 경로로 라우팅 메시지 :Akka 낙타 다음과 같이

AppleProducer -> seda:appleRoute -> AppleConsumer 

OrangeProducer -> seda:orangeRoute -> OrangeConsumer 

내가 비록보고하고하는 Apple 이벤트가 간헐적으로 그 반대의 경우도 마찬가지 OrangeConsumer에 의해 소비하고되고 있다는 점이다.

이 예제를 실행하면 (아마도 몇 번) 아래에 다시 작성됩니다.

간헐적으로 어떻게 이런 일이 발생하는지 이해할 수 없습니다. 내가 도대체 ​​뭘 잘못하고있는 겁니까?

object TestApp extends App { 
    implicit val system = ActorSystem() 
    val camel = CamelExtension(system) 
    val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer") 
    system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer") 
    val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer") 
    system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer") 

    appleProducer ! new Apple("1") 
    orangeProducer ! new Orange("1") 
    appleProducer ! new Apple("2") 
    orangeProducer ! new Orange("2") 
    appleProducer ! new Apple("3") 
    orangeProducer ! new Orange("3") 
    appleProducer ! new Apple("4") 
    orangeProducer ! new Orange("4") 
    appleProducer ! new Apple("5") 
    orangeProducer ! new Orange("5") 
    appleProducer ! new Apple("6") 
    orangeProducer ! new Orange("6") 

} 

class MyProducer(route: String) extends Actor with ActorLogging { 

    def receive = { 
    case payload: Any => 
     val template = CamelExtension(context.system).template 
     template.setDefaultEndpointUri(route) 
     template.sendBody(payload) 
    } 
} 

class MyAppleConsumer(route: String) extends Consumer with ActorLogging { 
    override def endpointUri: String = route 

    override def receive: Receive = { 
    case event: CamelMessage if event.body.isInstanceOf[Apple] => 
     log.info("Received event {}", event.body) 
    case _ => throw new IllegalArgumentException("Invalid entity") 
    } 
} 

class MyOrangeConsumer(route: String) extends Consumer with ActorLogging { 
    override def endpointUri: String = route 

    override def receive: Receive = { 
    case event: CamelMessage if event.body.isInstanceOf[Orange] => 
     log.info("Received event {}", event.body) 
    case _ => throw new IllegalArgumentException("Invalid entity") 
    } 
} 

class Apple(id: String) 
class Orange(id: String) 

답변

0

나는 이것을 결국 알아낼 수 있었다고 생각합니다.

이 문제는 SEDA와 관련이 없습니다. 대신 MyProducer 여러 인스턴스에 대해 동일한 DefaultProducerTemplate이 반환됩니다.

따라서, 나를 위해 defaultEndpointUri

솔루션을 설정 경쟁 조건이 가끔있다, 우리는

0

나는이 경쟁 조건이 발생하지 않도록하기 위해 MyProducer 배우의 하나 개의 인스턴스를 만드는 것이 었습니다 MyAppleConsumerMyOrangeConsumerConsumer을 사용하는 것과 같은 방식으로 MyProducer에 템플릿을 사용하는 대신 Producer이라는 특성을 확장하는 것이 좋습니다.

class MyProducer(route: String) extends Producer with OneWay { 
    def endpointUri = route 
} 

더 많은 정보를 원하시면 여기에서 찾을 수 있습니다 : (: 컴파일 또는 테스트하지! 고지 사항) :

case class Apple(id: String) 
case class Orange(id: String) 

object TestApp extends App { 
    implicit val system = ActorSystem() 

    val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer") 
    system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer") 
    val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer") 
    system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer") 

    appleProducer ! Apple("1") 
    orangeProducer ! Orange("1") 
    appleProducer ! Apple("2") 
    orangeProducer ! Orange("2") 
    appleProducer ! Apple("3") 
    orangeProducer ! Orange("3") 
    appleProducer ! Apple("4") 
    orangeProducer ! Orange("4") 
    appleProducer ! Apple("5") 
    orangeProducer ! Orange("5") 
    appleProducer ! Apple("6") 
    orangeProducer ! Orange("6") 

} 

class MyProducer(route: String) extends Producer with OneWay with ActorLogging { 
    def endpointUri = route 
} 

class MyConsumer(route: String) extends Consumer with ActorLogging { 
    override def endpointUri: String = route 

    override def receive: Receive = { 
    case CamelMessage(body : Apple, headers) => 
     log.info("Received event {}", body) 
    case CamelMessage(body : Orange, headers) => 
     log.info("Received event {}", body) 
    case _ => throw new IllegalArgumentException("Invalid entity") 
    } 
} 
+0

내가 필요 난 당신이 같은 코드를 단순화 할 수 있어야한다고 생각 http://doc.akka.io/docs/akka/snapshot/scala/camel.html

을 내 프로듀서 (예제에서 생략 한)에서 사용자 지정 메시지 작성 논리를 추가하여 템플릿을 사용한 이유입니다. – DJ180