2017-03-25 1 views
1

저는 GraphStage를 작성해야하지만 몇 가지 문제가있었습니다. 코드를 아래에 압축하여 사람들이 나를 위해 그것을 밝힐 수 있기를 바랍니다.Akka Streams의 GraphStage 관련 문제

아래의 샘플 코드는 실제 사용 사례가 아니며, 요점을 설명하기위한 것입니다. 잘하면 그것은 akka 스트림에 대해 이해하지 못하는 것이지 한계가 아닙니다.

샘플 코드는 WrapFlowShape가있는 그래프를 작성하고 기본적으로 그래프의 "in"을 연결 흐름으로, 그래프의 "out"을 흐름 밖으로 리디렉션합니다. 되고

1 out.onPull 
2 in.onPush 
3 flowOut.onPull 

InHandler.onPush() "flowIn"님

1 out.onPull 
2 in.onPush 
3 flowOut.onPull 
4 flowIn.onPush 
result = 2 

하지만 실제 출력은 상기 제 3 개 라인이다

import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.javadsl.RunnableGraph 
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source} 
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 

import scala.collection.immutable 
import scala.io.StdIn 

object WrapFlowSandbox extends App { 
    case class WrapFlowShape[I, O](
     in: Inlet[I], 
     out: Outlet[O], 
     flowIn: Inlet[O], 
     flowOut: Outlet[I]) extends Shape { 
    val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil 
    val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil 
    def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy) 
    } 
    class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] { 
    val in: Inlet[I] = Inlet[I]("WrapFlow.in") 
    val out: Outlet[O] = Outlet[O]("WrapFlow.out") 
    val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn") 
    val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut") 
    val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut) 
    def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
     var inElem: I = _ 
     setHandler(in, new InHandler { 
     def onPush = { 
      println("2 in.onPush") 
      inElem = grab(in) 
      pull(flowIn) 
     } 
     }) 
     setHandler(out, new OutHandler { 
     def onPull = { 
      println("1 out.onPull") 
      pull(in) 
     } 
     }) 
     setHandler(flowIn, new InHandler { 
     def onPush = { 
      println("4 flowIn.onPush") 
      val outElem = grab(flowIn) 
      push(out, outElem) 
     } 
     }) 
     setHandler(flowOut, new OutHandler { 
     def onPull = { 
      println("3 flowOut.onPull") 
      push(flowOut, inElem) 
     } 
     }) 
    } 
    } 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    val flow = Flow[Int].map(_ + 1) 
    RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val select = b.add(new WrapFlow[Int, Int]) 
    Source.single(1) ~> select.in 
    select.out ~> Sink.foreach[Int](r => println(s"result = $r")) 
    select.flowOut ~> flow ~> select.flowIn 
    ClosedShape 
    }).run(materializer) 
    StdIn.readLine 
    system.terminate 
} 

난이 기대 출력은 결코 부르지 않았다.

이런 식으로 GraphStage를 작성하는 것이 비관적이지만, 필자에게는 그 필요성이 있습니다. 저 퍼즐 무엇

제가 2 단계 (손잡이 (flowIn)) 차례로 연결된 흐름에 당겨 부착 흐름에 대한 수요를 발생하는 3 단계

에서 "flowOut"에 대한 요구를 생성한다는 것이다

하지만 3 단계에서 flowOut을 통해 요소를 푸시 한 후 요소가 푸시되지 않았으므로 4 단계는 실행되지 않았습니다.

왜 그런가요?

첨부 된 흐름이 요청을 다운 스트림으로 감지하고 3 단계에서 요청을 업스트림으로 생성하는 경우 3 단계에서 밀어 넣은 요소가 연결된 스트림으로 전달되지 않는 이유는 무엇입니까?

+0

Btw. 귀하의 모양은 이미 akka에서 'BidiShape'로 정의되어 있습니다. 대답에 대해 – jrudolph

답변

1

처리기의 논리를 잘 모르겠습니다. 나는 그들을 개정 나는 당신의 GraphDSL.create() 내용에서 이해 내용에 따라 다음

out.onPull 
flowOut.onPull 
in.onPush 
flowIn.onPush 
result = 2 

copyFromPorts()WrapFlowShape 경우에 오버라이드 (override)되지 않은 그 방법을 발견 : 그것은 다음과 같은 출력을 생성한다 실행

def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
    var inElem: I = _ 
    setHandler(in, new InHandler { 
    def onPush = { 
     println("in.onPush") 
     inElem = grab(in) 
     push(flowOut, inElem) 
    } 
    }) 
    setHandler(out, new OutHandler { 
    def onPull = { 
     println("out.onPull") 
     pull(flowIn) 
    } 
    }) 
    setHandler(flowIn, new InHandler { 
    def onPush = { 
     println("flowIn.onPush") 
     val outElem = grab(flowIn) 
     push(out, outElem) 
    } 
    }) 
    setHandler(flowOut, new OutHandler { 
    def onPull = { 
     println("flowOut.onPull") 
     pull(in) 
    } 
    }) 
} 

클래스 (추상 클래스가 아님) 나는 당신이 다음과 같은 것으로 그것을 오버라이드 할 필요가 있다고 믿는다 :

override def copyFromPorts(
    inlets: immutable.Seq[Inlet[_]], 
    outlets: immutable.Seq[Outlet[_]]) = { 
    WrapFlowShape[I, O](
    inlets(0).as[I], 
    outlets(0).as[O], 
    inlets(1).as[O], 
    outlets(1).as[I]) 
} 
+0

고마워요. 작동하지만 내 유스 케이스는 그보다 조금 더 복잡합니다. 간단하게 만들었지 만 실제로는 여러 개의 flowIn과 flowOut가 있습니다. 내가 어떤 흐름을 알기 전에 inElem을 얻을 필요가있어. 그래서 나는 그들 모두를 끌어낼 수 없다 .Pull. –

+0

btw, Shape은 추상 클래스이고 deepCopy는 추상이기 때문에 재정의가 필요하지 않습니다. –