2013-04-18 1 views
1

내가 들어오는 연결로 노출 한 :결합 IObservable <IObservable <T>>은 IList에 <T>는

IObservable<double> 

새로운 연결이 때마다 내가 통해 통지 :

IObservable<IObservable<double>> m_IncomingConnections; 

내 문제입니다 I 내가 할 수있는 연산자를 만들고 싶습니다.

IObservable<IList<double>> m_joined = m_IncomingConnections.JoinAll(); 

처음에는 m_joined 관측치가 Lists를 세 개의 요소로 푸시 할 3 개의 들어오는 연결이 있다고 가정합니다.

|--a1----a2----------a3------- 
|b1---b2----b3--b4---------b5- 
|---c1----c2---c3--c4--------- 

결과 :

>[a1,b1,c1] 
>[a1,b2,c1] 
>[a2,b2,c1] 
>[a2,b2,c2] 
>[a2,b3,c2] 

당신은 아이디어를 얻었으나, 새로운 연결하면 나는 새 연결이 새 값을 밀어 자마자 크기가 4 인이 될 이제 배열을하고 싶습니다 제공됩니다.

이 모든

는 지금 모든 스트림의 평균 값에 새 값이 밀어 때마다 스트리밍하려면

m_joined.Select(vv => vv.Average()); 

을 사용할 수 있도록합니다.

수신을 위해 이러한 확장을 구현하는 방법에 대한 어떤 제안 ?

도움 주셔서 감사합니다.

답변

1

주된 문제는 관측 가능 목록을 관측 가능 목록으로 변환하는 것입니다.

IObservable<List<T>> CombineNLatest<T>(List<IObservable<T>> list){ 

    return list 
     .Aggregate 
      (Observable.Return(new List<T>()) 
      , (a, n) => Observable.CombineLatest 
       (a 
       , b 
       , (l, e) => l.Concat(e).ToList())); 
} 

테스트를 해보지는 않았지만 그와 비슷한 기능을 제공합니다. 지금 당신은 당신이 당신의 목록을 구축 에 필요로하는 문제>

IObservable<List<IObservable<T>> 
CollectList<T> 
(this IObservable<IObservable<T>> input) 
{ 
    var initial = new List<IObservable<T>>(); 
    return Observable.Create(observer => { 

     return input.Subscribe(o => { 

      initial.Append(o); 
      observer.OnNext(initial); 

     } 

    }); 
} 

가 다시 테스트하지를하지만 일반적인 생각은 분명하다 바랍니다. 이제 함께 연결하십시오

IObservable<IObservable<T>> input = ...; 
IObservable<IObservable<List<T>>> intermediate = input 
    .CollectList() 
    .Select(list => CombineNLatest(list)); 

IObservable<List<T>> final = intermediate.Switch(); 
관련 문제