다른 액터에서 메시지를 보내 일시 중지/일시 중지 해제 할 수있는 GraphStage를 작성하고 싶습니다.Akka Stream - Pausable GraphStage (Akka 2.5.7)
아래 코드는 임의 번호를 생성하는 간단한 GraphStage
을 보여줍니다. 무대가 구체화되면 GraphStageLogic
은 StageActor
이 포함 된 메시지 (preStart()
)를 감독자에게 보냅니다. 감독자는 무대의 ActorRef
을 유지하므로 무대 제어에 사용할 수 있습니다. 응용 프로그램이 단계를 시작하면
object Application extends App {
implicit val system = ActorSystem("my-actor-system")
implicit val materializer = ActorMaterializer()
val supervisor = system.actorOf(Props[Supervisor], "supervisor")
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor)
val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
randomNumberSource.take(100).runForeach(println)
println("Start stream by pressing any key")
StdIn.readLine()
supervisor ! UnPause
StdIn.readLine()
supervisor ! Pause
StdIn.readLine()
println("=== Terminating ===")
system.terminate()
}
:
object Supervisor {
case class AssignStageActor(ref: ActorRef)
}
class Supervisor extends Actor with ActorLogging {
var stageActor: Option[ActorRef] = None
override def receive: Receive = {
case AssignStageActor(ref) =>
log.info("Stage assigned!")
stageActor = Some(ref)
ref ! Done
case Pause =>
log.info("Pause stream!")
stageActor match {
case Some(ref) => ref ! Pause
case _ =>
}
case UnPause =>
log.info("UnPause stream!")
stageActor match {
case Some(ref) => ref ! UnPause
case _ =>
}
}
}
내가 스트림을 실행하려면 다음 응용 프로그램을 사용하고 있습니다 :
object RandomNumberSource {
case object Pause
case object UnPause
}
class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("rnd.out")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new RandomNumberSourceLogic(shape)
}
private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging {
lazy val self: StageActor = getStageActor(onMessage)
val numberGenerator: Random = Random
var isPaused: Boolean = true
override def preStart(): Unit = {
supervisor ! AssignStageActor(self.ref)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!isPaused) {
push(out, numberGenerator.nextInt())
Thread.sleep(1000)
}
}
})
private def onMessage(x: (ActorRef, Any)): Unit =
{
x._2 match {
case Pause =>
isPaused = true
log.info("Stream paused")
case UnPause =>
isPaused = false
getHandler(out).onPull()
log.info("Stream unpaused!")
case _ =>
}
}
}
}
이 수퍼바이저 배우의 매우 간단한 구현은 ia는 '일시 중지됨'상태이며 어떤 숫자도 생성하지 않습니다. 열쇠를 누르면 무대에서 숫자가 나오기 시작합니다. 하지만 문제는 시작된 후에 무대로 보내지는 모든 메시지가 무시된다는 것입니다. 나는 무대를 멈출 수 없다.
액터로부터받은 메시지를 기반으로 무대의 동작을 변경하는 데 관심이 있습니다. 그러나 발견 된 모든 예제는 액터의 메시지를 스트림에 전달합니다.
누군가 내 코드가 작동하지 않는 이유를 추측하거나 그러한 빌드 방법을 알고 있습니까? GraphStage
?
대단히 감사합니다!
답변 해 주셔서 감사합니다. 당신의 제안에 따라 나는 AsyncCallback을 사용하여 동일한 [RandomNumberSource] (https://gist.github.com/kKdH/b03064f1f500f64e0e2058fb82bf0750#file-randomnumbersource-scala)를 구현했다. 밸브처럼 사용합니다. 그러나 여전히 같은 문제. 스트림이 실행되는 즉시 스테이지는 콜백 호출에 반응하지 않습니다. 'push()'가 처음으로 호출 될 때까지 문제를 좁힐 수 있습니다. 너는 어떤 생각을 가지고 있니? – kKdH