2017-01-12 2 views
1

같은 종류의 마녀가 두 개의 무한 소스가 하나의 그래프에 연결될 수 있다고 가정합니다. 나는 이미 구체화 된 그래프 외부에서 그들 사이를 전환하고 싶습니다. KillSwitch를 사용하여 그 중 하나를 종료 할 수있는 것과 같은 방식 일 수 있습니다. 내가 source1를 사용하려면 기본적으로 내가 source2여러 소스간에 전환하는 방법은 무엇입니까?

source1 
     \ 
      switcher ~> source  

source2 

에서 데이터를 사용하려면 스위치 다음

val source1: Source[ByteString, NotUsed] = ??? 
val source2: Source[ByteString, NotUsed] = ??? 

val (switcher: Switcher, source: Source[ByteString, NotUsed]) = 
    Source.combine(source1,source2).withSwitcher.run() 

switcher.switch() 

그것은 Akka 스트림이 로직을 구현 할 수 있습니까?

답변

2

잠시 후 해결책을 찾았습니다.

여기서는 VLAN과 동일한 원칙을 사용할 수 있습니다. 내 소스에 태그를 지정하고 MergeHub을 통해 전달해야합니다. 그런 다음 태그로 해당 소스를 필터링하고 올바른 결과를 소스로 쉽게 생성 할 수 있습니다.

하나씩 전환해야하는 모든 것 소스는 필터 조건이 변경된 것입니다.

object SomeSource { 

    private var current = "tag1" 

    val source1: Source[ByteString, NotUsed] = ??? 
    val source2: Source[ByteString, NotUsed] = ??? 

    def switch = { 
    current = if (current == "tag1") "tag2" else "tag1" 
    } 

    val (sink: Sink[(String, ByteString), NotUsed], 
     source: Source[ByteString, NotUsed]) = 
    MergeHub.source[(String, ByteString)] 
     .filter(_._1 == current) 
     .via(Flow[(String, ByteString)].map(_._2)) 
     .toMat(BroadcastHub.sink[ByteString])(Keep.both).run() 

    source1.map(s => ("tag1", s)).runWith(sink) 
    source2.map(s => ("tag2", s)).runWith(sink) 

} 

SomeSource.source // do something with Source 

SomeSource.switch() // then switch 
: 여기
source1.map(s => (tag1, s)) 
          \ 
          MergeHub.filter(_._1 == tagX).map(_._2) -> Source 
         /
source2.map(s => (tag2, s)) 

일부 예는
관련 문제