2011-10-28 2 views
1

관찰 대상의 최신 값을 20 가지 얻고이를 블로킹없이 속성으로 표시하려고합니다. MostRecentBars 게터가 호출 그러나IObservable TakeLast (n) and blocking

class Foo 
{ 
    private IObservable<int> observable; 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return this.observable.TakeLast(20).ToEnumerable(); 
     } 
    } 
} 

, 최소 20 개 관측 값이 될 때까지 ToEnumerable는 반환하지 않습니다 아마도 때문에,이, 차단 :이 순간, 내 코드는 것 같습니다.

차단없이 관찰 기록의 가장 최근 값을 최대 20 개까지 노출시키는 기본 제공 방법이 있습니까? 관찰 된 값이 20 개 미만이면 모든 값을 반환해야합니다.

+0

더 소위 없습니다 TakeLast 방법있다 IObservable ojlovecd

답변

0

귀하의 요구 사항에 맞는 내장형 Rx 연산자를 생각할 수 없습니다. 다음과 같이 구현할 수 있습니다.

class Foo 
{ 
    private IObservable<int> observable; 
    private Queue<int> buffer = new Queue<int>(); 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 

     this.observable 
      .Subscribe(item => 
      { 
       lock (buffer) 
       { 
        if (buffer.Count == 20) buffer.Dequeue(); 
        buffer.Enqueue(item); 
       } 
      }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      lock (buffer) 
      { 
       return buffer.ToList();  // Create a copy. 
      } 
     } 
    } 
} 
2

두 가지를 선택하겠습니다. 하나는 Rx Scan 연산자를 사용하지만, 나는 그것을 읽는 것이 좀 더 복잡하다고 생각합니다. 다른 하나는 잠금 장치 인 Queue 표준을 사용합니다. 선택할 수 있습니다.

(1)

class Foo 
{ 
    private int[] bars = new int[] { }; 

    public Foo(IObservable<int> bar) 
    { 
     bar 
      .Scan<int, int[]>(
       new int[] { }, 
       (ns, n) => 
        ns 
         .Concat(new [] { n, }) 
         .TakeLast(20) 
         .ToArray()) 
      .Subscribe(ns => bars = ns); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return bars; 
     } 
    } 
} 

(2)

class Foo 
{ 
    private Queue<int> queue = new Queue<int>(); 

    public Foo(IObservable<int> bar) 
    { 
     bar.Subscribe(n => 
     { 
      lock (queue) 
      { 
       queue.Enqueue(n); 
       if (queue.Count > 20) 
       { 
        queue.Dequeue(); 
       } 
      } 
     }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      lock (queue) 
      { 
       return queue.ToArray(); 
      } 
     } 
    } 
} 

난이 도움이되기를 바랍니다. 이미 답을 가지고 있지만

+0

에 :

귀하의 케이스를 들어

, 당신은 할 수 있어야 최선의 방법으로 그 접근 방식을 따르십시오. 우리의 솔루션이이 솔루션에 얼마나 근접해 있는지는 놀랍습니다. 나는 내 게시 후까지 그의 모습을 보지 않았다. 그럼에도 불구하고'Scan' 연산자는 종종 간과 되나 많은 상황에서 매우 유용합니다. – Enigmativity

+0

나는 실제로'Scan'을 사용하는 첫 번째 접근법을 선호한다. 그것은 "last (up to) n"이상으로 관찰 가능한 체인을 계속할 수있게한다. 실제로, 'var recentAverageScore = scores.TakeUpToLast (25) .Average();'또는 'IObservable '결과로 수행하려는 작업을 확장 메소드로 전환하는 것이 좋습니다. –

0

, 나는 버퍼를 사용하여이 재생 주제를 해결하는 생각과 같은 것을 함께했다되었습니다

class Foo 
{ 
    private ReplaySubject<int> replay = new ReplaySubject<int>(20); 

    public Foo(IObservable<int> bar) 
    { 
     bar.Subscribe(replay); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      var result = new List<int>(); 
      replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread 
      return result; 
     } 
    } 
} 

이이 문제에 맞는 알려줘.

1

나는 내가 반응 확장 빌드 모든 프로젝트에 첨부하는 경향이 몇 가지 확장 기능을 가지고, 그들 중 하나는 슬라이딩 윈도우입니다 : 이것은 가장 최근의 N 항목의 배열을 반환

public static IObservable<IEnumerable<T>> SlidingWindow<T>(this IObservable<T> o, int length) 
{ 
    Queue<T> window = new Queue<T>(); 

    return o.Scan<T, IEnumerable<T>>(new T[0], (a, b) => 
    { 
     window.Enqueue(b); 
     if (window.Count > length) 
      window.Dequeue(); 
     return window.ToArray(); 
    }); 
} 

(또는 N 항목이 아직없는 경우는 적음). 지금 Ilian 핀존가 꽤 많이 나는 것과 동일한`Queue` 구현을 쓴 본 후

class Foo 
{ 
    private IObservable<int> observable; 
    private int[] latestWindow = new int[0]; 

    IDisposable slidingWindowSubscription; 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 
     slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a => 
      { 
       latestWindow = a; 
      }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return latestWindow; 
     } 
    } 
}