2014-12-27 2 views
3

여러 관찰자와 관찰 가능한 간격을 실행하는 응용 프로그램이 있습니다. 간격이 0.5 초마다 웹 서버에서 일부 XML 데이터를로드하면 관찰자는 백그라운드 스레드에서 응용 프로그램 특정 처리를 수행합니다. 데이터가 더 이상 필요하지 않으면 구독 및 간격 관찰이 처리되므로 관찰자의 OnNext/OnCompleted/OnError는 더 이상 호출되지 않습니다. 여태까지는 그런대로 잘됐다.잠금 장치를 사용하지 않고 Rx 관찰자가 완료 될 때까지 기다리십시오.

내 문제 : 드물게 내 Observer의 OnNext 메서드를 호출 한 후에도 여전히 실행 중일 수 있습니다! 폐기 후 추가 작업을 진행하기 전에 OnNext가 완료되었는지 확인하고 싶습니다.

내 현재 해결 방법 : 내 관찰자 클래스 (코드 참조)에 보관함 필드를 도입했습니다. 처리를 마친 후 잠금을 획득하고 잠금을 획득 한 후에 만 ​​계속 시도합니다. 이 솔루션이 작동하는 동안 (?), 그것은 어떻게 든 저에게 잘못 느끼는 것입니다.

질문 :이 문제를 해결하기 위해보다 우아하고 "Rx Way"가 있습니까?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace RxExperimental 
{ 
    internal sealed class MyXmlDataFromWeb 
    { 
     public string SomeXmlDataFromWeb { get; set; } 
    } 

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb> 
    { 
     private readonly object _locker = new object(); 
     private readonly string _observerName; 

     public MyObserver(string observerName) { 
      this._observerName = observerName; 
     } 

     public object Locker { 
      get { return this._locker; } 
     } 

     public void OnCompleted() { 
      lock (this._locker) { 
       Console.WriteLine("{0}: Completed.", this._observerName); 
      } 
     } 

     public void OnError(Exception error) { 
      lock (this._locker) { 
       Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message); 
      } 
     } 

     public void OnNext(MyXmlDataFromWeb value) { 
      lock (this._locker) { 
       Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId); 
       Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb); 
       Thread.Sleep(5000); // simulate some long running operation 
       Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId); 
      } 
     } 
    } 

    internal sealed class Program 
    { 
     private static void Main() { 
      const int interval = 500; 
      // 
      var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => { 
       var data = new MyXmlDataFromWeb { 
        SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now) 
       }; 
       return data; 
      }).Publish(); 
      // 
      var observer1 = new MyObserver("Observer 1"); 
      var observer2 = new MyObserver("Observer 2"); 
      // 
      var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1); 
      var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2); 
      // 
      var connection = dataSource.Connect(); 
      // 
      Console.WriteLine("Press any key to cancel ..."); 
      Console.ReadLine(); 
      // 
      subscription1.Dispose(); 
      subscription2.Dispose(); 
      connection.Dispose(); 
      // 
      lock (observer1.Locker) { 
       Console.WriteLine("Observer 1 completed."); 
      } 
      lock (observer2.Locker) { 
       Console.WriteLine("Observer 2 completed."); 
      } 
      // 
      Console.WriteLine("Can only be executed, after all observers completed."); 
     } 
    } 
} 
+0

콜백. Rx는 효과적으로 콜백의 파이프 라인이므로 소스 제작자를 차단하게됩니다. 대신에 메시지 전달 디자인을 고려해 보거나 OnNext 처리기가 비동기의 또 다른 계층을 도입하는 모습을 보길 원할 수도 있습니다 (중첩 된 관찰 가능한 시퀀스 참조). –

+0

또한 IObserver 을 구현하지 않을 것을 제안합니다 (또는 IObservable ) 인터페이스. 대신 연산자를 사용하여 쿼리를 작성하십시오. –

답변

3

네, 여기에는 더 많은 Rx 방식이 있습니다.

첫 번째 관찰은 관찰 가능한 스트림의 구독 취소가 본질적으로 관찰자 내에서 일어나고있는 것과는 독립적이라는 점입니다. 실제로 어떤 피드백도 없습니다. 관측이 끝났을 때 명확하게 알고 있어야한다는 요구 사항이 있으므로 관측 가능한 스트림으로이를 모델링해야합니다. 즉, 스트림에서 수신 거부하는 대신 스트림으로 완료해야 OnComplete 이벤트를 관찰 할 수 있습니다. 귀하의 경우에는 TakeUntil을 사용하여 Observable을 구독 취소하는 대신 종료 할 수 있습니다.

두 번째 관찰은 "관찰자"의 작업이 끝나면 주 프로그램이 관찰해야한다는 것입니다. 그러나 "관찰자"를 실제 IObservable으로 만들었으므로이 작업을 수행 할 방법이 없습니다. 이것은 사람들이 처음 Rx를 사용할 때 나타나는 혼란의 일반적인 원인입니다. "관찰자"를 관찰 가능 체인의 다른 링크로 모델링하면 주 프로그램에서 관찰 할 수 있습니다. 특히, "관찰자"는 들어오는 XML 데이터를 "완료된"메시지로 매핑하는 매핑 작업 (부작용이있는 것)에 불과합니다. 당신이 당신의 코드를 리팩토링 경우

그래서, 당신은 ... 당신이 원하는 것을 얻을 수

public class MyObserver 
{ 
    private readonly string _name; 

    public MyObserver(string name) { _name = name; } 

    public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source) 
    { 
     return source.Select(value => 
     { 
      Thread.Sleep(5000); // simulate work 
      return Unit.Default; 
     }); 
    } 
} 

// main 
var endSignal = new Subject<Unit>(); 
var dataSource = Observable 
    .Interval(...) 
    .Select(...) 
    .TakeUntil(endSignal) 
    .Publish(); 
var observer1 = new MyObserver("Observer 1"); 
var observer2 = new MyObserver("Observer 2"); 
var results1 = observer1.Handle(dataSource.ObserveOn(...)); 
var results2 = observer2.Handle(dataSource.ObserveOn(...)); 
// since you just want to know when they are all done 
// just merge them. 
// use ToTask() to subscribe them and collect the results 
// as a Task 
var processingDone = results1.Merge(results2).Count().ToTask(); 

dataSource.Connect(); 

Console.WriteLine("Press any key to cancel ..."); 
Console.ReadLine(); 

// end the stream 
endSignal.OnNext(Unit.Default); 

// wait for the processing to complete. 
// use await, or Task.Result 
var numProcessed = await processingDone; 
당신이 긴 당신의 OnNext에서 작업을 차단 수행하는 경우 나, 당신은 수신의 정신에서 작동하지 않는 것을 주장
+0

입력 해 주셔서 대단히 감사드립니다. 그러나이 접근법은 답이없는 몇 가지 질문을 남겨 둡니다. 1. MyOberver의 Handle 메서드에서 실제 작업을 수행하면 onNext 핸들러를 대체 할 수 있지만이 접근 방법으로 onError 및 onCompleted 이벤트를 처리하는 방법은 무엇입니까? 그리고 onError/onCompleted가 수행 한 작업이 간격을 취소 한 후에 추가 작업을 진행하기 전에 실제로 완료되었는지 어떻게 알 수 있습니까? 물론 일회용 구독을 추가 할 수는 있지만 이전과 동일한 문제가 있습니다. ;-) – dotNZ

+0

2. MyObserver의 Handle 메서드에서 실제 작업을하는 것은 단점이 있습니다. processingDone.Wait()은 시퀀스의 모든 메시지가 처리 될 때까지 대기합니다. 이 동작은 바람직하지 않습니다!인터벌을 취소 한 직후 모든 미해결 메시지 (있는 경우)를 무시하거나 처리하기를 기대합니다. (현재 실행중인 onNext/onError/onCompleted의 완료 만 기다립니다). 내 초기 접근 방식 (잠금 사용)은 구독을 삭제하여이 동작을 달성합니다. 아이디어가 있으십니까? – dotNZ

+0

1 - MyObserver가'Select '뒤에 연산자를 추가하도록하십시오. 정확한 요구 사항에 따라 다양한 선택이 가능합니다. [.Do()] (http://msdn.microsoft.com/en-us/library/hh229830(v=103) .aspx)는 쉽게 찾을 수 있습니다. Rx는'processingDone' 작업이 완료되기 전에 작업이 완료되도록 보장합니다. – Brandon

관련 문제