2017-02-23 1 views
1

이것은 GraphDSL API를 사용하는 정말 간단하고 초보적인 질문입니다. 나는 여러 스레드 SO 관련 읽고 난 대답을 볼 수 없습니다 :Akka Streams : GraphDSL API에서 Materialized Sink 출력을 얻으려면 어떻게해야합니까?

val actorSystem = ActorSystem("QuickStart") 
val executor = actorSystem.dispatcher 
val materializer = ActorMaterializer()(actorSystem) 

val source: Source[Int, NotUsed] = Source(1 to 5) 
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping) 
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2) 
val sink = Sink.foreach(println) 

val graphModel = GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    // I presume I want to change this shape to something else 
    // but I can't figure out what it is. 
    ClosedShape 
} 
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the 
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape 
// but I can't get that working. 
val graph = RunnableGraph.fromGraph(graphModel) 

// This works and gives me the materialized sink output using the simpler API. 
// But I want to use the GraphDSL so that I can add branches or junctures. 
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right) 

답변

4

트릭은 구체화 된 값 (귀하의 경우 sink)을 원하는 단계를 GraphDSL.create으로 전달하는 것입니다. 두 번째 매개 변수로 전달하는 함수도 변경되어 그래프에 사용할 수있는 Shape 입력 매개 변수 (아래 예에서는 s)가 필요합니다.

val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> s 

    // ClosedShape is just fine - it is always the shape of a RunnableGraph 
    ClosedShape 
    } 
    val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel) 

자세한 내용은 docs에서 찾을 수 있습니다.

+0

내가 도와 준 문서 참조를 위해 upvoted ;-) –

+1

ahh 결승선에 맞음;) 잘 연주 –

+0

당신 덕분에. GraphDSL.create (sink) 호출에 싱크를 추가하면 ClosedShape에 컴파일러 오류가 발생합니다. 어떻게 업데이트합니까? – clay

3
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    ClosedShape 
} 
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)  
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right) 

GraphDSL의 API의 문제는, 암시 적 Builder는 크게 오버로드됩니다. create에 싱크대를 두어 Builder[NotUsed]Builder[Future[Done]]으로 바꾸고 builder => shape 대신 builder => sink => shape의 기능을 나타냅니다.

+0

감사합니다. 'GraphDSL.create'에 싱크 파라미터를 추가하면'ClosedShape' 라인에 새로운 컴파일러 에러가 생깁니다. 어떻게 생각하니? – clay

+0

죄송합니다, 다른 답변에 대한 답변 :-) –

관련 문제