2017-12-19 5 views
1

나는이 두 싱크와 같은 스트림을 가지고 있지만, 하나는 한 번에 사용됩니다여러 싱크

Source.fromElements(1, 2, 3) 
.via(flow) 
.runWith(sink1) 

또는

Source.fromElements(1, 2, 3) 
.via(flow) 
.runWith(sink2) 

그것은 구성 할 수 있습니다 우리가 사용하는 싱크 ,하지만 두 개의 싱크를 동시에 사용한다면 어떨까요? 어떻게하면됩니까?

Sink.combine에 대해 생각했지만 병합 전략이 필요하며 어떤 식 으로든 이러한 싱크 결과를 결합하고 싶지 않습니다. 나는 그 (것)들을 진짜로 걱정하지 않는다, 그래서 동일한 데이타를 HTTP를 통해 어떤 종점에 보내고 동시에 그들을 데이타베이스에 보내고 싶다. 싱크 결합은 브로드 캐스트와 매우 비슷하지만 처음부터 브로드 캐스트를 구현하면 코드의 가독성이 떨어집니다. 이제 간단한 소스, 흐름 및 싱크가 있으며 저수준 그래프 단계가 없습니다.

내가 어떻게 하나의 싱크대를 사용하여 배압과 다른 것들을 가지고 있는지 어떻게 알 수 있습니까?

답변

4

당신은 (API docs 참조) alsoTo를 사용할 수 있습니다

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore) 
+0

두 번째 싱크 앞에 간단한 .async를 추가하여 이러한 싱크를 병렬로 실행하는 방법은 무엇입니까? 병렬로 실행하고 싶지만 여전히 배압이 있습니다. 즉, 가장 느린 싱크대에서 보낸 시간만큼 빠른 속도로 실행되도록하고 모든 싱크대에서 소비 한 시간의 합계를 사용하지 않습니다. 이들은 동기식으로 실행되기 때문에). –

3

방송은 가독성을 감소하지 않아야 가장 간단한 형태로 GraphDSL를 사용하여 - 사실, 사람이 심지어 일부 방법으로 ~> 조항이를 시각화 주장 할 수 스트림 구조 :

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val bcast = builder.add(Broadcast[Int](2)) 

    Source.fromElements(1, 2, 3) ~> flow ~> bcast.in 
    bcast.out(0) ~> sink1 
    bcast.out(1) ~> sink2 

    ClosedShape 
}) 
graph.run() 
+0

이러한 싱크가 병렬로 실행됩니까? –

+0

기본적으로 Akka Streams는 그래프 처리 단계를 순차적으로 실행하지만 원하는 경우 'async' 메소드를 사용하여 병렬로 실행할 수 있습니다. 자세한 내용은이 주제의 [Akka Stream doc] (https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html?language=scala)을 참조하십시오. –