저는 GraphStage를 작성해야하지만 몇 가지 문제가있었습니다. 코드를 아래에 압축하여 사람들이 나를 위해 그것을 밝힐 수 있기를 바랍니다.Akka Streams의 GraphStage 관련 문제
아래의 샘플 코드는 실제 사용 사례가 아니며, 요점을 설명하기위한 것입니다. 잘하면 그것은 akka 스트림에 대해 이해하지 못하는 것이지 한계가 아닙니다.
샘플 코드는 WrapFlowShape가있는 그래프를 작성하고 기본적으로 그래프의 "in"을 연결 흐름으로, 그래프의 "out"을 흐름 밖으로 리디렉션합니다. 되고
1 out.onPull
2 in.onPush
3 flowOut.onPull
InHandler.onPush() "flowIn"님
1 out.onPull
2 in.onPush
3 flowOut.onPull
4 flowIn.onPush
result = 2
하지만 실제 출력은 상기 제 3 개 라인이다
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable
import scala.io.StdIn
object WrapFlowSandbox extends App {
case class WrapFlowShape[I, O](
in: Inlet[I],
out: Outlet[O],
flowIn: Inlet[O],
flowOut: Outlet[I]) extends Shape {
val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil
val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil
def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy)
}
class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] {
val in: Inlet[I] = Inlet[I]("WrapFlow.in")
val out: Outlet[O] = Outlet[O]("WrapFlow.out")
val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn")
val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut")
val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut)
def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var inElem: I = _
setHandler(in, new InHandler {
def onPush = {
println("2 in.onPush")
inElem = grab(in)
pull(flowIn)
}
})
setHandler(out, new OutHandler {
def onPull = {
println("1 out.onPull")
pull(in)
}
})
setHandler(flowIn, new InHandler {
def onPush = {
println("4 flowIn.onPush")
val outElem = grab(flowIn)
push(out, outElem)
}
})
setHandler(flowOut, new OutHandler {
def onPull = {
println("3 flowOut.onPull")
push(flowOut, inElem)
}
})
}
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val flow = Flow[Int].map(_ + 1)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val select = b.add(new WrapFlow[Int, Int])
Source.single(1) ~> select.in
select.out ~> Sink.foreach[Int](r => println(s"result = $r"))
select.flowOut ~> flow ~> select.flowIn
ClosedShape
}).run(materializer)
StdIn.readLine
system.terminate
}
난이 기대 출력은 결코 부르지 않았다.
이런 식으로 GraphStage를 작성하는 것이 비관적이지만, 필자에게는 그 필요성이 있습니다. 저 퍼즐 무엇
제가 2 단계 (손잡이 (flowIn)) 차례로 연결된 흐름에 당겨 부착 흐름에 대한 수요를 발생하는 3 단계
에서 "flowOut"에 대한 요구를 생성한다는 것이다하지만 3 단계에서 flowOut을 통해 요소를 푸시 한 후 요소가 푸시되지 않았으므로 4 단계는 실행되지 않았습니다.
왜 그런가요?
첨부 된 흐름이 요청을 다운 스트림으로 감지하고 3 단계에서 요청을 업스트림으로 생성하는 경우 3 단계에서 밀어 넣은 요소가 연결된 스트림으로 전달되지 않는 이유는 무엇입니까?
Btw. 귀하의 모양은 이미 akka에서 'BidiShape'로 정의되어 있습니다. 대답에 대해 – jrudolph