2016-07-04 3 views
1

나는 CQRS Akka 배우 앱의 질의면을하고 있습니다.Akka Persistence 질의와 배우 샤링

쿼리 배우는 클러스터 샤드로 설정되며 하나의 지속성 쿼리 스트림의 이벤트로 채워집니다.

내 질문은 :

  1. 클러스터 파편에있는 배우 중 하나가 그것을 복구하는 방법을 다시 시작하면?

    • 전체 클러스터 샤드를 종료하고 모든 이벤트에 응답 하시겠습니까?
    • 클러스터 샤드에있는 액터를 영속 액터로 만들고 쿼리 측에만 새로운 이벤트 세트를 저장 하시겠습니까?
  2. 지속성 쿼리에서 필자 인 액터가 다시 시작되면 어떻게 현재의 PQ를 취소하고 다시 시작할 수 있습니까?

+1

쿼리 액터의 상태를 메모리에만 유지하고 있습니까? 내 쿼리 측면에서는 Persistence Queries를 사용하여 데이터베이스 뷰를 업데이트합니다. – thwiegan

+0

예, 저는 배우를 기억에 남기고 있습니다. – Reeebuuk

+0

귀하의 의견은 오직 하나의 Persitence ID 이상을 소비합니까? – thwiegan

답변

2

제가 논의한 것처럼 데이터베이스에서 쿼리 측면을 지속적으로 평가할 것입니다.

그 옵션을 선택하지 않습니다 당신이 할 샤드 당 하나의 지속성 쿼리를 고수 할 경우 쿼리 배우에 다음

var inRecovery: Boolean = true; 

override def preStart() = { 
    //Subscribe to your event live stream now, so you don't miss anything during recovery 
    // e.g. send Subscription message to your persistence query actor 

    //Re-Read everything up to now for recovery 
    readJournal.currentEventsByPersistenceId("persistenceId") 
     .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished 
     .map(Replay.apply) // Mark your replay messages 
     .runWith(Sink.actorRef(self, tag)) // Send all replay events to self 
} 

override def receive = { 
    case Done => // Recovery is finished 
     inRecovery = false 
     unstashAll() // unstash all normal messages received during recovery 

    case Replay(payload) => 
     //handle replayed messages 

    case events: Event => 
     //handle normal events from your persistence query 
     inRecovery match { 
      case true => stash() // stash normal messages until recovery is done 
      case false => 
       // recovery is done, start handling normal events 
     } 
} 


case class Replay(payload: AnyRef) 

을 그래서 기본적으로 배우가 지속 쿼리 배우에 가입을 시작하기 전에 및 모든 사건이 통과 한 후에 종결되는 모든 과거 사건의 한정된 흐름으로 국가를 회복하십시오. 복구 중에는 재생 이벤트가 아닌 모든 들어오는 이벤트를 숨 깁니다. 그런 다음 복구가 완료되면 모든 것을 일시 중지하고 정상적인 메시지를 처리하기 시작합니다.

관련 문제