2017-11-01 2 views
0

akka 스트림으로 내 첫 번째 단계를 얻으십시오. 내가 g.run() 하지만 난 그게 어떻게 중지 할 수 있습니다를 사용하여 그래프를 실행할 수 있습니다실행 가능한 그래프를 중지하는 방법

val topHeadSink = Sink.head[Int] 
val bottomHeadSink = Sink.head[Int] 
val sharedDoubler = Flow[Int].map(_ * 2)  
val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder => 
     (topHS, bottomHS) => 
     import GraphDSL.Implicits._ 
     val broadcast = builder.add(Broadcast[Int](2)) 
     Source.single(1) ~> broadcast.in  

    broadcast.out(0) ~> sharedDoubler ~> topHS.in 
    broadcast.out(1) ~> sharedDoubler ~> bottomHS.in 
    ClosedShape 
}) 

: 나는 here에서 복사, 이것과 같은 그래프가? 어떤 상황에서 을 사용해야합니까? 이 그래프는 액터 내에 포함되어 있습니다. 액터가 충돌하면 액터의 그래프가 어떻게됩니까? 끝낼 것인가?

답변

2

documentation에 설명 된대로 그래프 외부에서 그래프를 완성하는 방법은 KillSwitch입니다. 문서에서 복사 한 예제는 원본이 단일 요소 일 뿐이며 스트림을 실행할 때 매우 빨리 완료되므로이 방법을 설명하기에 적합하지 않습니다.

val topSink = Sink.foreach(println) 
val bottomSink = Sink.foreach(println) 
val sharedDoubler = Flow[Int].map(_ * 2) 
val killSwitch = KillSwitches.single[Int] 

val g = RunnableGraph.fromGraph(GraphDSL.create(topSink, bottomSink, killSwitch)((_, _, _)) { 
    implicit builder => (topS, bottomS, switch) => 

    import GraphDSL.Implicits._ 

    val broadcast = builder.add(Broadcast[Int](2)) 
    Source.fromIterator(() => (1 to 1000000).iterator) ~> switch ~> broadcast.in 

    broadcast.out(0) ~> sharedDoubler ~> topS.in 
    broadcast.out(1) ~> sharedDoubler ~> bottomS.in 
    ClosedShape 
}) 

val res = g.run // res is of type (Future[Done], Future[Done], UniqueKillSwitch) 
Thread.sleep(1000) 
res._3.shutdown() 

소스는 이제 백만 요소로 구성하고, 싱크는 이제 방송 요소를 인쇄 : 좀 더 쉽게 행동에 KillSwitch를 볼 수있는 그래프를 조정할 수 있습니다. 스트림이 1 초 동안 실행되므로 스트림을 완료하기 전에 shutdown을 호출하기 전에 100 만 개의 요소를 휘젓다는 충분한 시간이 아닙니다.

액터 내부에서 스트림을 실행하는 경우 스트림을 실행하기 위해 생성 된 기본 액터 (또는 액터)의 라이프 사이클이 "둘러싸는"액터의 라이프 사이클과 관련되어 있는지 여부는 구체화자가 작성되는 방법에 따라 다릅니다. 자세한 내용은 documentation을 읽어보십시오. 콜린 브렉 (Colin Breck)이 연기자와 KillSwitch을 사용하여 스트림의 라이프 사이클을 관리하는 블로그 게시물은 다음과 같이 유용합니다. http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/

관련 문제