2013-04-28 5 views
14

akka에서 이벤트 버스를 사용하는 방법에 대한 좋은 자습서/설명이 있습니까? Akka 의사를 읽었지만 이벤트 버스를 사용하는 방법을 이해하는 것이 어려워졌습니다.Akka 이벤트 버스 자습서

+0

http://www.kotancode.com/2014/02/12/using-the-akka-event-bus/ – AnonGeek

+1

건설적이지 않습니까? 다른 질문에 대한 답변을 찾을 수있는 곳은 어디입니까? –

답변

39

좋은 자습서가 있는지 여부는 확실하지 않지만 간단한 예를 들어 줄 수 있습니다. 이벤트 스트림을 사용하면 도움이 될 수있는 사용자 사례. 그러나 높은 수준에서 이벤트 스트림은 앱에있을 수있는 게시/하위 유형 요구 사항을 충족시키기위한 좋은 메커니즘입니다. 시스템의 사용자 잔액을 업데이트하는 유스 케이스가 있다고 가정 해 보겠습니다. 잔액에 자주 액세스하므로 더 나은 성능을 위해 캐시에 저장하기로 결정했습니다. 잔액이 업데이트되면 사용자가 잔액으로 임계 값을 넘었는지 확인하고 보려는 경우이를 전자 메일로 보냅니다. 중량 감소 및 사용자 응답 속도가 느려질 수 있으므로 캐시 잔액 또는 잔액 임계 값 확인을 기본 잔액 업데이트 호출에 직접 연결하지 마십시오. 이 예에서 AccountCacherLowBalanceChecker 배우

//Message and event classes 
case class UpdateAccountBalance(userId:Long, amount:Long) 
case class BalanceUpdated(userId:Long) 

//Actor that performs account updates 
class AccountManager extends Actor{ 
    val dao = new AccountManagerDao 

    def receive = { 
    case UpdateAccountBalance(userId, amount) => 
     val res = for(result <- dao.updateBalance(userId, amount)) yield{ 
     context.system.eventStream.publish(BalanceUpdated(userId)) 
     result     
     } 

     sender ! res 
    } 
} 

//Actor that manages a cache of account balance data 
class AccountCacher extends Actor{ 
    val cache = new AccountCache 

    override def preStart = { 
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated]) 
    } 

    def receive = { 
    case BalanceUpdated(userId) => 
     cache.remove(userId) 
    } 
} 

//Actor that checks balance after an update to warn of low balance 
class LowBalanceChecker extends Actor{ 
    val dao = new LowBalanceDao 

    override def preStart = { 
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated]) 
    } 

    def receive = { 
    case BalanceUpdated(userId) => 
     for{ 
     balance <- dao.getBalance(userId) 
     theshold <- dao.getBalanceThreshold(userId) 
     if (balance < threshold) 
     }{ 
     sendBalanceEmail(userId, balance) 
     } 
    } 
} 

을 모두 BalanceUpdated 이벤트에 대한 클래스 타입에 의해 eventStream에 가입 : 당신은 너무 같은 요구 사항의 특정 세트를 모델링 할 수 있습니다. 이 이벤트가 스트림에 이벤트 게시되면이 액터 인스턴스 모두에 의해 수신됩니다. 그런 다음 AccountManager에서 균형 업데이트가 성공하면 사용자에 대해 BalanceUpdated 이벤트가 발생합니다. 이 경우 병렬로 메시지가 AccountCacherLowBalanceChecker의 메일 함으로 배달되어 캐시와 계정 임계 값이 일치하고 전자 메일이 전송 될 수 있습니다.

자 이제 AccountManager에 직접 tell (!) 전화를 걸면이 두 명의 배우와 직접 통신 할 수 있지만 균형 업데이트의 두 가지 "부작용"을 너무 가깝게 연결한다고 주장 할 수 있습니다. 세부 정보 유형은 반드시 AccountManager에 속하지 않습니다. 순전히 핵심 비즈니스 흐름 자체의 일부가 아닌 부작용으로 발생해야하는 몇 가지 추가 작업 (검사, 업데이트 등)을 초래할 수있는 조건이있는 경우 이벤트 스트림이 좋은 방법 일 수 있습니다 제기되는 사건과 그 사건에 대응해야 할 사람을 분리 할 수 ​​있습니다.

+0

감사합니다. 몇 가지 질문 : 1) 이벤트 버스를 구독하면됩니까? 어떻게 사건 버스를 "파괴"시키는가? 2) 이벤트 버스를 담당하는 특정 배우가 있습니까? 3) 분류자를 선언하지 않았다는 것을 알았습니까? 그렇다면 선택되어있는 분류자가 있습니까? – Tsume

+1

'ActorSystem'에는 이미 이벤트 버스가 생성됩니다. 너 자신을 만들 필요가 없다. 'ActorSystem'이 버스를 생성 할 때, 나는 루트 수호자가 버스를 담당하고 있다고 가정합니다. 3 번 질문이 무슨 뜻인지 모르겠습니다. 좀 더 설명 할 수 있니? – cmbaxter

+1

방금 ​​[doc] (http://doc.akka.io/docs/akka/2.1.2/scala/event-bus.html)을 다시 읽었을 때, 저 자신의 질문을 오해 한 것처럼 보입니다. 내가 말하고자하는 것은 위 예제의 액터들이 특정 메시지 (BalanceUpdated)를 받기 위해 가입되어 있다는 것입니다. 다양한 메시지를 보낼 수있는 주제에 대해 배우를 구독하는 방법은 무엇입니까? – Tsume

10

ActorSystem마다 존재하는 EventBus이 있습니다. 이 EventBusEvent Stream으로 표시되며 system.eventStream으로 전화하여 얻을 수 있습니다.

ActorSystem은 logging을 포함하여 Dead LettersCluster Events을 포함한 여러 가지 항목에 대해 이벤트 스트림을 사용합니다.

자신 만의 게시/구독 요구 사항에 대해 이벤트 스트림을 사용할 수도 있습니다. 예를 들어, 이벤트 스트림은 테스트 중에 유용 할 수 있습니다. 특정 이벤트 (예 : 이벤트 로깅)의 경우 Test KittestActor을 이벤트 스트림에 구독하고 expect 수 있습니다. 무언가가 발생했을 때 다른 액터로 메시지를 보내지 않지만 테스트에서 이벤트를 예상해야 할 때 특히 유용 할 수 있습니다.

이벤트 스트림은 ActorSystem 하나에서만 작동합니다. 스트림에 게시 된 원격 이벤트를 사용하는 경우 기본적으로 원격 시스템을 통과하지 마십시오 (직접 지원을 추가 할 수도 있음).

이벤트 스트림을 사용하지 않으려는 경우 이론적으로 별도의 EventBus을 만들 수 있습니다.

이벤트 버스에 대한 더 나은 문서가 Akka 2.2에서 작동 중이므로 this ticket이 완료되면 다시 확인하십시오.

관련 문제