2017-10-30 2 views
0

처음으로 Akka, 특히 Akka 지속성을 실험하고 있습니다. 궁극적으로 이벤트 소스 애플리케이션에서 Akka의 사용법을 재현 할 수있는 작은 장난감 프로그램을 구현하려고합니다. ReadJournal을 사용하여 내 이벤트 스트림을 내 도메인으로 투영하려고 시도 할 때까지 성공했습니다.Akka 지속성 : ReadJournal.runFold never returns

def main(args: Array[String]): Unit = { 
    val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate()) 

    implicit val executionContext = ExecutionContext.global 
    implicit val system = ActorSystem.create("employee-service-actor-system") 
    implicit val mat: Materializer = ActorMaterializer()(system) 

    val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId)) 

    commands.stream.foreach(command => service.tell(command, noSender)) 

    lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal") 
     .asInstanceOf[ReadJournal 
     with CurrentPersistenceIdsQuery 
     with CurrentEventsByPersistenceIdQuery 
     with CurrentEventsByTagQuery 
     with EventsByPersistenceIdQuery 
     with EventsByTagQuery] 

    println(Await.result(
     readJournal 
     .eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue) 
     .map(_.event) 
     .runFold(Employee.apply())({ 
      case (employee: Employee, event: EmployeeEvent) => employee.apply(event) 
     }), 
     Duration("10s") 
    ))  
} 

내 도메인의 유일한 집계는 Employee, 그래서 그냥 직원을 대표하는 UUID와 배우를 시작하고, 그리고 내가 그 직원에 대한 몇 가지 명령을 실행하고 있습니다.

위 예제에서 println(Await.result(...))을 제거하고 .runFold(...).runForeach(println)으로 바꾼 경우 각 명령에 대해 내 액터에 유지 된 이벤트가 예상대로 인쇄됩니다. 그래서 나는 내 프로그램의 쓰기면과 ReadJournal이 모두 예상대로 작동하고 있음을 안다. 내 프로그램은 이제

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] 

로 종료됨에 따라-

제 질문은, 왜 나는 궁극적으로 내 이벤트 스트림을 재생할 runFold 수행 할 수 있습니까? 이 작업을 수행하는 더 좋은 방법이 있습니까? API를 오용 한 것입니까?

감사합니다. 감사합니다.

답변

1

runFold을 사용하면 스트림을 폴딩 할 수 있습니다. 스트림 자체가 종료되면 폴드가 효과적으로 종료됩니다.

eventsByPersistenceId을 사용하면 결코 라이브 이벤트 스트림을 요청하지 않으므로 폴드가 종료되지 않습니다.

사용 사례 대신 currentEventsByPersistenceId을 사용해야합니다. 이 변형은 현재 저널에서 사용 가능한 이벤트를 스트리밍하고 종료합니다.

https://doc.akka.io/docs/akka/2.5.6/scala/persistence-query.html#eventsbypersistenceidquery-and-currenteventsbypersistenceidquery를 참조

+0

아, 당신의 도움을 주셔서 감사합니다 - 나는이 문서에서 찾을 수 없습니다 하나 일 : 스트림 종료 할 수있는 방법은 무엇인가? 나는'registerOnTerminate (...) '콜백을 붙이고 프로그램을 종료하는'system.stop (service)'을 시도했다. Akka가 스트림 종료를 제공하는 간단한 방법이 있는가? 터미널이 '싱크 (Sink)'를 제공하는 열쇠입니까? 그래서 스트림이 결국 어떻게 종료되어야하는지에 대한 개념이 있습니까? – lyonssp

+0

'KillSwitch'es, https://doc.akka.io/docs/akka/current/scala/stream/stream-dynamic.html#uniquekillswitch 및 https://doc.akka.io/docs/akka/current를 참조하십시오. /scala/stream/stream-dynamic.html#sharedkillswitch –

+0

당신은 전설이고, 감사합니다. 나는 다음에 문서를 더 깊이 파고들 것을 약속한다 : p – lyonssp