2014-12-05 5 views
4

나는 라이브 데이터가있는 스트림과 함께 속한 라이브 데이터의 일부를 구분하는 스트림을 가지고 있습니다. 이제 누군가가 실시간 데이터 스트림에 가입하면 실시간 데이터를 재생하고 싶습니다. 그러나 나는 모든 실시간 데이터를 기억하고 싶지 않습니다. 다른 스트림이 값을 내 보낸 마지막 시간 이후의 부분 만 기억합니다.관찰 가능 버퍼 제한

There is an issue 내 문제를 해결할 것입니다. 정확하게 (또는 적어도 내가 생각하는) 재생 작업자가 있기 때문입니다.

현재이 작업을 쉽게 수행 할 수있는 방법은 무엇입니까? 다음과 같은 것보다 더 좋은 방법이 있습니까?

private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem> 
{ 
    private readonly List<TItem> cached = new List<TItem>(); 
    private readonly IObservable<TDelimiter> delimitersObservable; 
    private readonly IObservable<TItem> itemsObservable; 
    public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable) 
    { 
     this.itemsObservable = itemsObservable; 
     this.delimitersObservable = delimitersObservable; 
    } 

    public IDisposable Subscribe(IObserver<TItem> observer) 
    { 
     lock (cached) 
     { 
      cached.ForEach(observer.OnNext); 
     } 

     return itemsObservable.Subscribe(observer); 
    } 

    public IDisposable Connect() 
    { 
     var delimiters = delimitersObservable.Subscribe(
      p => 
       { 
        lock (cached) 
        { 
         cached.Clear(); 
        } 
       }); 
     var items = itemsObservable.Subscribe(
      p => 
       { 
        lock (cached) 
        { 
         cached.Add(p); 
        } 
       }); 
     return Disposable.Create(
      () => 
       { 
        items.Dispose(); 
        delimiters.Dispose(); 
        lock (cached) 
        { 
         cached.Clear(); 
        } 
      }); 
} 

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters) 
{ 
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters); 
} 
+0

그냥 생각에 잠금 인종의 모든 조건을 떠나있는 장점이있다 ...'ConcurrentBag 는'캐시'에'lock'을 사용하는 것보다 더 나은 옵션이되지 않을 것 '? 내 말은, 그것이 그것을 위해 설계된 것입니다 ... – toadflakz

+1

@toadflakz - AFAIK, ConcurrentBag는 추가 순서 유지를 보장하지 않습니다 (관찰 가능한 항목이 올바르게 작동하면 List를 사용하여 올바른 순서로 항목을 가져옵니다). ConcurrentQueue는이를 해결할 수 있지만, 목록을 지우는 것은 ConcurrentQueue를 지우는 것보다 쉽습니다. –

+0

설명 주셔서 감사합니다. 실시간 데이터 개발에 관심이있어서 경험이 풍부한 사람들의 코드 디자인 의사 결정에 대한 감사를드립니다. – toadflakz

답변

4

이 작업이 원하는 작업을 수행합니까? 그것은 수신 전문가 :

private class ReplayWithLimitObservable<T, TDelimiter> : IConnectableObservable<T> 
{ 
    private IConnectableObservable<IObservable<T>> _source; 

    public ReplayWithLimitObservable(IObservable<T> source, IObservable<TDelimiter> delimiter) 
    { 
    _source = source 
     .Window(delimiter) // new replay window on delimiter 
     .Select<IObservable<T>,IObservable<T>>(window => 
     { 
     var replayWindow = window.Replay(); 

     // immediately connect and start memorizing values 
     replayWindow.Connect(); 

     return replayWindow; 
     }) 
     .Replay(1); // remember the latest window 
    } 

    IDisposable Connect() 
    { 
    return _source.Connect(); 
    } 

    IDisposable Subscribe(IObserver<T> observer) 
    { 
    return _source 
     .Concat() 
     .Subscribe(observer); 
    } 
} 

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters) 
{ 
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters); 
} 
+0

이 방법이 더 좋지만,'IConnectableObservable '을 구현해야한다는 사실을 여전히 좋아하지 않습니다. 그건 사실이 아니야, 그렇지 않니? 당신은 너무 피할 수 없지만 여전히 여기까지 .......- –

+0

적어도 [this] (https://github.com/Reactive-Extensions/Rx.NET/issues/54) ReplaySubject '의 "반응/반응성"버전이 가장 좋을지라도. 이 문제는 다음 코드 라인에서이 문제를 해결할 것입니다 :'return xs.Multicast (new ReplaySubject (ys)); ' –

+0

@DaveSexton - 완전히 동의합니다.이 질문에 대한 나의 반응은 실제로'IConnectableObservable '의 팩토리가 Rx에 오신 것을 환영합니다. –