2017-12-03 3 views
1

다른 액터에서 메시지를 보내 일시 중지/일시 중지 해제 할 수있는 GraphStage를 작성하고 싶습니다.Akka Stream - Pausable GraphStage (Akka 2.5.7)

아래 코드는 임의 번호를 생성하는 간단한 GraphStage을 보여줍니다. 무대가 구체화되면 GraphStageLogicStageActor이 포함 된 메시지 (preStart())를 감독자에게 보냅니다. 감독자는 무대의 ActorRef을 유지하므로 무대 제어에 사용할 수 있습니다. 응용 프로그램이 단계를 시작하면

object Application extends App { 

    implicit val system = ActorSystem("my-actor-system") 
    implicit val materializer = ActorMaterializer() 

    val supervisor = system.actorOf(Props[Supervisor], "supervisor") 

    val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor) 
    val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph) 

    randomNumberSource.take(100).runForeach(println) 

    println("Start stream by pressing any key") 

    StdIn.readLine() 

    supervisor ! UnPause 

    StdIn.readLine() 

    supervisor ! Pause 

    StdIn.readLine() 

    println("=== Terminating ===") 
    system.terminate() 
} 

:

object Supervisor { 
    case class AssignStageActor(ref: ActorRef) 
} 

class Supervisor extends Actor with ActorLogging { 

    var stageActor: Option[ActorRef] = None 

    override def receive: Receive = { 

    case AssignStageActor(ref) => 
     log.info("Stage assigned!") 
     stageActor = Some(ref) 
     ref ! Done 

    case Pause => 
     log.info("Pause stream!") 
     stageActor match { 
     case Some(ref) => ref ! Pause 
     case _ => 
     } 

    case UnPause => 
     log.info("UnPause stream!") 
     stageActor match { 
     case Some(ref) => ref ! UnPause 
     case _ => 
     } 
    } 
} 

내가 스트림을 실행하려면 다음 응용 프로그램을 사용하고 있습니다 :

object RandomNumberSource { 
    case object Pause 
    case object UnPause 
} 

class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] { 

    val out: Outlet[Int] = Outlet("rnd.out") 

    override val shape: SourceShape[Int] = SourceShape(out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new RandomNumberSourceLogic(shape) 
    } 

    private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging { 

    lazy val self: StageActor = getStageActor(onMessage) 

    val numberGenerator: Random = Random 
    var isPaused: Boolean = true 

     override def preStart(): Unit = { 
     supervisor ! AssignStageActor(self.ref) 
     } 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      if (!isPaused) { 
      push(out, numberGenerator.nextInt()) 
      Thread.sleep(1000) 
      } 
     } 
     }) 

     private def onMessage(x: (ActorRef, Any)): Unit = 
     { 
     x._2 match { 
      case Pause => 
      isPaused = true 
      log.info("Stream paused") 
      case UnPause => 
      isPaused = false 
      getHandler(out).onPull() 
      log.info("Stream unpaused!") 
      case _ => 
     } 
     } 
    } 
} 

이 수퍼바이저 배우의 매우 간단한 구현은 ia는 '일시 중지됨'상태이며 어떤 숫자도 생성하지 않습니다. 열쇠를 누르면 무대에서 숫자가 나오기 시작합니다. 하지만 문제는 시작된 후에 무대로 보내지는 모든 메시지가 무시된다는 것입니다. 나는 무대를 멈출 수 없다.

액터로부터받은 메시지를 기반으로 무대의 동작을 변경하는 데 관심이 있습니다. 그러나 발견 된 모든 예제는 액터의 메시지를 스트림에 전달합니다.

누군가 내 코드가 작동하지 않는 이유를 추측하거나 그러한 빌드 방법을 알고 있습니까? GraphStage?

대단히 감사합니다!

답변

1

Akka Stream Contrib 프로젝트에는 흐름을 일시 중지했다가 다시 시작할 수있는 값을 구체화하는 Valve 스테이지가 있습니다. 이 클래스에서 Scaladoc :

중지 또는 스테이지를 통과 요소의 흐름을 다시 시작 상기 플립 방법을 제공 ValveSwitch의 미래 구체화. 밸브가 닫혀있는 한 배압이 걸립니다. 예를 들어

:

val (switchFut, seqSink) = Source(1 to 10) 
    .viaMat(new Valve(SwitchMode.Close))(Keep.right) 
    .toMat(Sink.seq)(Keep.both) 
    .run() 

switchFutFuture[ValveSwitch]하고, 스위치가 초기에 폐쇄되기 때문에, 배압 밸브 아무것도 하류 방출된다. 밸브를 여는 방법 :

switchFut.onComplete { 
    case Success(switch) => 
    switch.flip(SwitchMode.Open) // Future[Boolean] 
    case _ => 
    log.error("the valve failed") 
} 

기타 예제는 ValveSpec입니다.

+0

답변 해 주셔서 감사합니다. 당신의 제안에 따라 나는 AsyncCallback을 사용하여 동일한 [RandomNumberSource] (https://gist.github.com/kKdH/b03064f1f500f64e0e2058fb82bf0750#file-randomnumbersource-scala)를 구현했다. 밸브처럼 사용합니다. 그러나 여전히 같은 문제. 스트림이 실행되는 즉시 스테이지는 콜백 호출에 반응하지 않습니다. 'push()'가 처음으로 호출 될 때까지 문제를 좁힐 수 있습니다. 너는 어떤 생각을 가지고 있니? – kKdH