2012-04-20 2 views
4

여러 개의 (2 ~ 10) 뜨거운 관찰 가능 항목이 있는데, 각각 주어진 시간 창에서 하나의 이벤트가 발생하지만 순서는 다릅니다.여러 관찰 가능 항목의 순서 동기화

내가 3 개 과목이 있고, 다음과 같은 임의의 순서로 그들이 불을 설명하기 위해 말할 수 있습니다 :

Subjects: sA, sB, sC 

Order of time window 1: 
    sB, then sA, then sC 
Order of time window 2: 
    sA, then sC, then sB 
Order of time window 3: 
    sA, then sB, then sC 
etc. 

를 그래서 나는 다음과 같은 스트림 얻을 것이다 병합 할 것입니다 경우 : SB, SA 사우스 캐롤라이나, SA를, sC, sB, sA, sB, sC 등

자, 이제 제 질문은 다음과 같습니다. 각 시간 창 내에서 이벤트 순서를 강제하고 싶습니다. 병합 된 스트림 다음 초래

1. sA 
2. sB 
3. sC 

: SA, SB, 사우스 캐롤라이나 SA, SB, 사우스 캐롤라이나 SA 등

SB, 사우스 캐롤라이나 윈도우 시간 자체는 알려져 있지 않지만 각 이후 이벤트는 각 윈도우에서 정확히 한 번 실행됩니다. 모든 피사체가 해고되면 윈도우가 닫히고 새로운 창이 열립니다.

아이디어 우아하게하는 방법?

확장 질문 (중요하지 않음) : 관찰 가능 항목은 서로 독립적이므로 각 관찰 가능 항목의 순서를 동기화 할 수있는 공통 정적 방법을 소개해야합니다.

public static IObservable<T> SynchronizeOrder<T>(IObservable<T> source, int order) 
{ 
    //return synchronized source 
} 

업데이트 : 뭔가 내 원래의 질문에 분명 아니었다 관찰 가능한 다른 이벤트 유형의 수 (나는 내 ​​예제에서 특정 이벤트 데이터 형식을 사용하지 않은 그 이유는) 결과적 수 있었다 나는 명령 된 관찰 대상을 병합하고 싶지 않다. 독립적 인 관측치가 순서대로 동기화되도록 보장 할 수있는 메커니즘이 필요합니다.

myEventStream 
.ObserveOn(TaskFactory) 
.DoSomeExpensiveComputation() 
.Order(2) //doesn't have to be an extension method 
.Subscibe(computationResult=> 
      sharedRessourceWhereOrderMatters.Update(computationResult)) 
+0

각 창을 엽니 무엇? 'sA'가 각 창을 닫지 만 어떤 창을 열지 모른다면'sB'와'sC'를 얼마나 오래 붙잡을 지 알 수 없습니다. – yamen

+0

좋은 지적, 제 질문이 충분히 명확하지 않아서 업데이트했습니다. – lukebuehler

답변

3

이것은 당신이 관찰 사용하여 원하는 것을 조인 :

using System.Reactive; 
    using System.Reactive.Subjects; 
    using System.Reactive.Linq; 

    var a = new Subject<int>(); 
    var b = new Subject<int>(); 
    var c = new Subject<int>(); 

    var test = 
     Observable.When(
      a.And(b).And(c).Then((a1, b1, c1) => new int[] { a1, b1, c1 }) 
     ).SelectMany(arr => arr.ToObservable()); 

    var sub = test.Subscribe(val => Console.Write("{0} ", val)); 

    a.OnNext(1); 
    b.OnNext(2); 
    c.OnNext(3); 

    a.OnNext(1); 
    c.OnNext(3); 
    b.OnNext(2); 

    c.OnNext(3); 
    a.OnNext(1); 
    b.OnNext(2); 

    Console.ReadKey(); 
    sub.Dispose(); 

출력은 다음과 같습니다

1 2 3 1 2 3 1 2 3 

주의 그러나 그 :

  1. 이 제공하지 않습니다 확장 메소드를 사용하면 관찰 가능한 특정 순서로 주사하십시오. 당신은 And (단편적 일 수있다)을 사용하여 계획을 세우고, 결국 그것을 실행할 필요가있다. '빌드 업'순서가 출력 순서입니다.

  2. 실제 창 만들기는 표시되지 않습니다. 그러나 어딘가에서 관찰 가능한 체인에 주입 할 수없는 이유는 없습니다.

  3. 이 솔루션은 세 관찰 변수가 모두 값을 가질 때마다 번을 번 실행합니다. 원하는 동작을 얻으려면 설정하고 올바른 지점에서 찢어 야 할 수도 있습니다.

+0

다른 솔루션을 사용했지만 결국 문제에 대해 생각해 줬습니다. – lukebuehler

1

yamens 답은 동일한 범위에있는 동일한 유형의 관찰 가능 항목에 대해 큽니다. 내 경우에는 (확장 질문) 관측 대상이 다른 유형이 될 수 있으며 실행 순서에 관계없이 순서를 동기화 할 수 있습니다.그래서 여기

내 제안 간단한 해결책 :

private readonly object syncRoot = new object(); 
private int sourceCount; 
private readonly SortedDictionary<int, Action> buffer = new SortedDictionary<int, Action>(); 

public IObservable<T> Order<T>(IObservable<T> source, int order) 
{ 
    return Observable.Create<T>(observer => 
    { 
     lock (syncRoot) 
      sourceCount++; 

     source.Synchronize(syncRoot).Subscribe(item => 
     { 
      buffer[order] =() => observer.OnNext(item); 
      if(buffer.Count == sourceCount) 
      { 
       foreach (var action in buffer.Values) 
        action(); 
       buffer.Clear(); 
      } 
     }, observer.OnError, observer.OnCompleted); 

     return() => 
     { 
      lock (syncRoot) 
       sourceCount--; 
     }; 
    }); 
}//end order 
+0

나는 당신이 의미하는 바를 봅니다 : 서로 다른 시간에 동기화하지만, 여기 관찰 가능 유형은 같습니다. 결국 관측 가능한 스트림을 병합하는 경우 끝에 단일 통일 유형이 필요하며 내 대답은이를 제공 할 수 있습니다 (필요한 경우 'Then'에서 다른 선택 기능 사용). 당신의 솔루션은 올바른 생각을 가지고 있지만, 어떤 예외 시나리오에서도 약해질 것입니다 (예를 들어 하나의 관측 대상이 한 번 이상 발사 됨). – yamen

+0

예 사실, 결국 병합이 필요하다는 잘못된 인상을주었습니다. 결과 순서를 설명하기를 원했습니다. 필자의 경우에는 동일한 데이터 스트림을 비동기 적으로 처리하는 여러 독립적 인 구성 요소 (플러그인과 거의 같은)가 있습니다 (예 : 일부 계산). 그들은 계산의 특정 지점 이후에 관찰 가능 항목의 연속 순서를 동기화 할 수있는 싱글 톤 또는 서비스를 공유합니다. 나는 주로 특정 시나리오에서 구성 요소가 우선 순위를 갖기 때문에이 작업을 수행하려고하지만 구성 요소 개발자에게 맡기고 싶습니다. – lukebuehler

+0

아, 그 말이 더 합리적입니다. 행운을 빌어 요. – yamen

관련 문제