여러 관찰자와 관찰 가능한 간격을 실행하는 응용 프로그램이 있습니다. 간격이 0.5 초마다 웹 서버에서 일부 XML 데이터를로드하면 관찰자는 백그라운드 스레드에서 응용 프로그램 특정 처리를 수행합니다. 데이터가 더 이상 필요하지 않으면 구독 및 간격 관찰이 처리되므로 관찰자의 OnNext/OnCompleted/OnError는 더 이상 호출되지 않습니다. 여태까지는 그런대로 잘됐다.잠금 장치를 사용하지 않고 Rx 관찰자가 완료 될 때까지 기다리십시오.
내 문제 : 드물게 내 Observer의 OnNext 메서드를 호출 한 후에도 여전히 실행 중일 수 있습니다! 폐기 후 추가 작업을 진행하기 전에 OnNext가 완료되었는지 확인하고 싶습니다.
내 현재 해결 방법 : 내 관찰자 클래스 (코드 참조)에 보관함 필드를 도입했습니다. 처리를 마친 후 잠금을 획득하고 잠금을 획득 한 후에 만 계속 시도합니다. 이 솔루션이 작동하는 동안 (?), 그것은 어떻게 든 저에게 잘못 느끼는 것입니다.
질문 :이 문제를 해결하기 위해보다 우아하고 "Rx Way"가 있습니까?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get { return this._locker; }
}
public void OnCompleted() {
lock (this._locker) {
Console.WriteLine("{0}: Completed.", this._observerName);
}
}
public void OnError(Exception error) {
lock (this._locker) {
Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
}
}
public void OnNext(MyXmlDataFromWeb value) {
lock (this._locker) {
Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
Thread.Sleep(5000); // simulate some long running operation
Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
}
}
}
internal sealed class Program
{
private static void Main() {
const int interval = 500;
//
var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
var data = new MyXmlDataFromWeb {
SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
};
return data;
}).Publish();
//
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
//
var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
//
var connection = dataSource.Connect();
//
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
//
subscription1.Dispose();
subscription2.Dispose();
connection.Dispose();
//
lock (observer1.Locker) {
Console.WriteLine("Observer 1 completed.");
}
lock (observer2.Locker) {
Console.WriteLine("Observer 2 completed.");
}
//
Console.WriteLine("Can only be executed, after all observers completed.");
}
}
}
콜백. Rx는 효과적으로 콜백의 파이프 라인이므로 소스 제작자를 차단하게됩니다. 대신에 메시지 전달 디자인을 고려해 보거나 OnNext 처리기가 비동기의 또 다른 계층을 도입하는 모습을 보길 원할 수도 있습니다 (중첩 된 관찰 가능한 시퀀스 참조). –
또한 IObserver을 구현하지 않을 것을 제안합니다 (또는 IObservable ) 인터페이스. 대신 연산자를 사용하여 쿼리를 작성하십시오. –