2012-01-31 4 views
0

.NET Reactive Extensions를 사용하여 로그 이벤트를 관찰했습니다. 현재 IObservable에서 파생 된 클래스를 사용하고 ReplaySubject를 사용하여 로그를 저장합니다. 그런 식으로 로그를 필터링하고 재생할 수 있습니다 (예 : 모든 오류 로그 표시 또는 모든 자세한 로그 표시). 버퍼링 한 로그를 손실하지 않습니다..NET Rx - ReplaySubject 버퍼 크기가 작동하지 않습니다.

문제는 그 주제에 버퍼 크기를 설정 한 경우에도입니다 :

this.subject = new ReplaySubject<LogEvent>(10); 

내가에 관찰 컬렉션에 추가 할 OnNext를 사용할 때 내 프로그램의 메모리 사용량이 지붕을 통해서 간다 무한 루프 :

internal void WatchForNewEvents() 
     { 
      Task.Factory.StartNew(() => 
       { 
        while (true) 
        { 
         dynamic parameters = new ExpandoObject(); 
         // TODO: Add parameters for getting specific log events 

         if (this.logEventRepository.GetManyHasNewResults(parameters)) 
         { 
          foreach (var recentEvent in this.logEventRepository.GetMany(parameters)) 
          { 
           this.subject.OnNext(recentEvent); 
          } 
         } 

         // Commented this out for now to really see the memory go up 
         // Thread.Sleep(1000); 
        } 
       }); 
     } 

ReplaySubject의 버퍼 크기가 작동하지 않습니까? 버퍼 크기에 도달하면 버퍼를 지우지 않는 것 같습니다. 도움이 많이 감사!

UPDATE :

나는 추가 같은 가입자 (이 잘못인가?) :

처럼라고
public IDisposable Subscribe(IObserver<LogEvent> observer) 
     { 
      return this.subject.Subscribe(observer); 
     } 

... :

// Inserts into UI ListView 
    this.logEventObservable.Subscribe(evt => this.InsertNewLogEvent(evt)); 
+0

ReplaySubject에 구독자를 추가하면 어떻게됩니까? 나는 이것이 누설해서는 안된다 고 생각하지만, 여전히 무슨 일이 일어나고 있는지 궁금하다. –

+0

감사합니다. 폴. 구독자를 추가하는 방법에 대한 자세한 정보를 추가했습니다. –

+0

아직 해결 방법을 모르지만 문제를 격리했습니다. 루프에서 OnNext를 호출하기 전에 제목을 구독하면 메모리 누수가 방지됩니다. 구독 후에 AFTER가 발생합니다. –

답변

0

내가 잘 모르겠어요 이것은 확실한 대답이지만, 사용중인 스케줄러 주변의 동시성으로 인해 문제가 발생했다고 생각됩니다. 당신이 ReplaySubject에 전화하는거야 생성자는 다음과 같습니다

public ReplaySubject(int bufferSize) 
    : this(bufferSize, TimeSpan.MaxValue, Scheduler.CurrentThread) 
{ } 

Scheduler.CurrentThread 나를 걱정. Scheduler.ThreadPool으로 변경하고 도움이되는지 확인해보십시오.

또한 사이드 노트로 Rx와 TPL 및 구식 스레드 자고있는 것처럼 보입니다. 일반적으로 그렇게하지 않는 것이 가장 좋습니다. 당신은 다음과 같이 당신의 WatchForNewEvents 코드를 변경할 수 있습니다 : 일을의 좋은 컴팩트 한 RX-Y 방식의

dynamic parameters = new ExpandoObject(); 

var newEvents = 
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0)) 
    where this.logEventRepository.GetManyHasNewResults(parameters) 
    from recentEvent in 
     this.logEventRepository.GetMany(parameters).ToObservable() 
    select recentEvent; 

newEvents.Subscribe(this.subject); 

합니다.

+0

스케줄러를 변경해도 영향을 미치지 않았지만 제안 해 주셔서 감사합니다. IObservable의 구현이 잘못되었는지 궁금합니다. –

+0

@NickRamirez - 내 직감이 잘 풀리지 않아서 미안합니다. 'IObservable'을 구현하지 않을 것을 권장합니다 - 이것은 종종 더 복잡해지며 잘못 이해하기 쉽습니다. 대신 내장 연산자를 사용하십시오. – Enigmativity

+0

결국 TPL 대신 Observable.Interval을 사용하여 제안한대로 메모리 누수를 수정했습니다. 감사! –

관련 문제