2016-11-05 9 views
6

에 의해 스트림을 분할 나는 다음과 같은 간단한 경우 클래스 계층이 있습니다Akka 스트림 형

sealed trait Message 
case class Foo(bar: Int) extends Message 
case class Baz(qux: String) extends Message 

을 그리고 나는 Flow[Message, Message, NotUsed] (코덱과 웹 소켓 기반 프로토콜에서 이미 자리에)있다.

Flow[Message]을 완전히 다른 경로로 처리되므로 Foo 및 Baz 유형에 대한 별도의 플로우로 역 다중화하려고합니다.

가장 간단한 방법은 무엇입니까? 분명해야하지만 뭔가를 놓치고 있습니다 ...

답변

5

한 가지 방법은 각 유형의 메시지에 대한 흐름을 포함하는 RunnableGraph를 만드는 것입니다.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 

    val in = Source(...) // Some message source 
    val out = Sink.ignore 

    val foo = builder.add(Flow[Message].map (x => x match { case [email protected](_) => f })) 
    val baz = builder.add(Flow[Message].map (x => x match { case [email protected](_) => b })) 
    val partition = builder.add(Partition[Message](2, { 
    case Foo(_) => 0 
    case Baz(_) => 1 
    })) 

    partition ~> foo ~> // other Flow[Foo] here ~> out 
    partition ~> baz ~> // other Flow[Baz] here ~> out 

    ClosedShape 
} 

g.run() 
+0

오른쪽, 파티션. 좋아, 그저 할 수있을거야. 아마 그것을 위해 내장 된 결합자를 갖는 것이 유익 할 것입니다; 아마도, 나는 풀 요청을 할 것이다. –

+0

@AlexanderTemerev 관심있어 : http://doc.akka.io/api/akka/2.4/?_ga=1.34091558.643806930.1478315511#akka.stream.scaladsl.Partition – Brian