2016-07-21 3 views
0

하나 이상의 Observables를 보유하고이를 병합하는 Observable 구현이 필요합니다. 하지만 여기 키커가 있습니다 : 나는 을 추가합니다. Observables를 추가하면 언제든지 병합 할 수 있습니다. 또한 제거하는 것이 좋습니다.RxJava - 언제든지 더 많은 Observables를 허용하는 Merged Observable?

정말 효과적이기 위해서는 모든 구독자가 구독 후 추가 된 새로운 Observable에서 알림을 받아야합니다. 병합 된 Observables가 모두 콜드가 아니고 onComplete()으로 전화하지 않는 한, Observables가 더 추가 된 경우에도 가입을 취소 할 수 있습니다. 이는 여러 개의 무한한 Observable을 병합하고 언제든지 추가 할 수있는 기능입니다.

MergableObservable<MyEvent> allSources = new MergableObservable<>(); 

//later in application 
Observable<MyEvent> eventSource1 = ... 
allSources.add(eventSource1); 

//and later again 
Observable<MyEvent> eventSource2 = ... 
allSources.add(eventSource2); 

//and so on 
Observable<MyEvent> eventSource3 = ... 
allSources.add(eventSource3); 

나는 병합 연산자가 있지만 가변 구조가 필요하다는 것을 알고있다. 이미 존재하는 것을 놓치고 있습니까? 나는이 상황에 절대적으로 적합한 것이 아닌 한 주제를 사용하지 않는 것을 선호합니다.

답변

2

코 틀린 구현입니다.

Subject<Observable<T>, Observable<T>> o = PublishSubject.<Observable<T>>().toSerialized(); 

ConcurrentHashSet<Observable<T>> live = ... 

o.flatMap(v -> v.takeWhile(x -> live.containsKey(v))).subscribe(...); 

Observable<T> inner = ... 
live.add(inner); 
o.onNext(inner); 

//... 

live.remove(inner); 
+0

니스. 그건 꽤 똑똑 하네. – tmn

0

어쩌면 주제가이를 처리하는 가장 합리적인 방법 일 수 있습니다. 다른 사람이 더 좋은 방법을 알고 있다면 나는 다른 답변에 대해 열려 있습니다. 그것은 일종의 내게 이것을 죽일 내장 된 방법이 없습니다. 나는 그것이 일반적인 필요가 있다고 상상한다.

당신은 수동으로 새로운 Observable 소스를 밀어 원하기 때문에 주제를 피할와 "자연"을 생성 할 수 없습니다

class MergingObservable<T> { 

    private val subject: SerializedSubject<T, T> = PublishSubject<T>().toSerialized() 

    fun toObservable(): Observable<T> = subject 

    operator fun plusAssign(observable: Observable<T>) { 
     add(observable) 
    } 

    fun add(observable: Observable<T>): Subscription = observable.subscribe(subject) 
} 
관련 문제