2009-11-20 4 views
37

Reactive Extensions에는 기존 이벤트와 비동기 작업을 관찰 가능으로 바꾸기위한 많은 도우미 방법이 있지만 IObservable <T>을 처음부터 구현하는 방법은 무엇입니까?처음부터 IObservable <T> 구현하기

IEnumerable에는 구현하기가 매우 쉬운 lovely yield 키워드가 있습니다.

IObservable을 구현하는 적절한 방법은 무엇입니까 <T>?

스레드 안전성에 대해 걱정할 필요가 있습니까?

특정 동기화 컨텍스트에서 다시 호출되는 것에 대한 지원이 있다는 것을 알고 있지만 IObservable 인 것입니다. <T> 작성자가 걱정할 필요가 있거나 어떻게 든 내장되어 있습니까?

갱신 :

여기에 브라이언의 F 번호 솔루션의 내 C# 버전입니다

using System; 
using System.Linq; 
using Microsoft.FSharp.Collections; 

namespace Jesperll 
{ 
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs 
    { 
     private FSharpMap<int, IObserver<T>> subscribers = 
       FSharpMap<int, IObserver<T>>.Empty; 
     private readonly object thisLock = new object(); 
     private int key; 
     private bool isDisposed; 

     public void Dispose() 
     { 
      Dispose(true); 
     } 

     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing && !isDisposed) 
      { 
       OnCompleted(); 
       isDisposed = true; 
      } 
     } 

     protected void OnNext(T value) 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnNext(value); 
      } 
     } 

     protected void OnError(Exception exception) 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      if (exception == null) 
      { 
       throw new ArgumentNullException("exception"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnError(exception); 
      } 
     } 

     protected void OnCompleted() 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnCompleted(); 
      } 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      if (observer == null) 
      { 
       throw new ArgumentNullException("observer"); 
      } 

      lock (thisLock) 
      { 
       int k = key++; 
       subscribers = subscribers.Add(k, observer); 
       return new AnonymousDisposable(() => 
       { 
        lock (thisLock) 
        { 
         subscribers = subscribers.Remove(k); 
        } 
       }); 
      } 
     } 
    } 

    class AnonymousDisposable : IDisposable 
    { 
     Action dispose; 
     public AnonymousDisposable(Action dispose) 
     { 
      this.dispose = dispose; 
     } 

     public void Dispose() 
     { 
      dispose(); 
     } 
    } 
} 

편집 : 폐기 두 번

+1

Wes Dyer는 이제 Channel9에서 이러한 인터페이스에 대한 계약서에 관한 비디오를 제공합니다. – Benjol

+1

(30 초 후 ... http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/) – Benjol

+0

쿨 - 그것을보아야합니다 :) –

답변

10

정직하게 말해서,이 모든 것이 '옳다'는 것이 확실하지 않지만, 지금까지 경험 한 바에 따르면 꽤 좋은 느낌입니다. F # 코드입니다.하지만 맛을 느낄 수 있기를 바랍니다. 소스 객체를 새로 만들 수 있습니다.이 객체는 Next/Completed/Error on을 호출 할 수 있으며, 소스 또는 클라이언트가 나쁜 일을 할 때 구독을 관리하고 Assert를 시도합니다.

type ObservableSource<'T>() =  // ' 
    let protect f = 
     let mutable ok = false 
     try 
      f() 
      ok <- true 
     finally 
      Debug.Assert(ok, "IObserver methods must not throw!") 
      // TODO crash? 
    let mutable key = 0 
    // Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over 
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // ' 
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnNext(x))) 
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnCompleted())) 
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnError(e))) 
    let thisLock = new obj() 
    let obs = 
     { new IObservable<'T> with  // ' 
      member this.Subscribe(o) = 
       let k = 
        lock thisLock (fun() -> 
         let k = key 
         key <- key + 1 
         subscriptions <- subscriptions.Add(k, o) 
         k) 
       { new IDisposable with 
        member this.Dispose() = 
         lock thisLock (fun() -> 
          subscriptions <- subscriptions.Remove(k)) } } 
    let mutable finished = false 
    // The methods below are not thread-safe; the source ought not call these methods concurrently 
    member this.Next(x) = 
     Debug.Assert(not finished, "IObserver is already finished") 
     next x 
    member this.Completed() = 
     Debug.Assert(not finished, "IObserver is already finished") 
     finished <- true 
     completed() 
    member this.Error(e) = 
     Debug.Assert(not finished, "IObserver is already finished") 
     finished <- true 
     error e 
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads 
    member this.Value = obs 

나는 여기에서 좋거나 나쁜 것에 대한 어떤 생각에도 관심이있을 것이다.

  • 관찰 가능한에 가입하는 사람들이 구독 던져해서는 안 : 아직 devlabs에서 모든 새로운 수신 물건을 볼 수있는 기회 ...

    내 자신의 경험

    이 제안이 없었어요. 관찰자가 가입자가 던질 때 할 수있는 일은 합당하지 않습니다. (이는 이벤트와 유사합니다.) 대부분 예외는 최상위 수준의 catch-all 처리기로 버블 링되거나 앱을 다운시킵니다.
  • 소스는 "논리적으로 단일 스레드"여야합니다. 동시 OnNext 호출에 반응 할 수있는 클라이언트를 작성하는 것이 더 어려울 수도 있습니다. 각 개별 호출이 다른 스레드에서 오는 경우에도 동시 호출을 피하는 것이 도움이됩니다.
  • 일부 '계약'을 시행하는 기본/도우미 클래스를 갖는 것이 유용합니다.

사람들이이 줄을 따라 더 구체적인 조언을 표시 할 수 있는지 궁금합니다. 이 구현에 관한 한 발언

+1

감사합니다. C#에서 비슷한 것을 만들 때 균열이 생겨서 열거 중에 잠금을 피하기 위해 F # Map 컬렉션을 사용했습니다. 또 다른 옵션은 Eric Lippert의 Immutable AVLTree와 같은 것을 사용하는 것입니다. 나는 이벤트가 적절한 컨텍스트에서 수신되도록하고 관찰 할 수있는 이벤트가 매번 동일한 스레드에서 발생해야한다는 것을 관찰자의 책임이라고 확신했습니다. –

2
  1. 균열 개방 호출하면 경우 ObjectDisposedException 던지지 마십시오 반사경과보세요.

  2. 시계 일부 C9 비디오 - this 하나는 당신이 '파생'선택 '콤비'

  3. 비밀은 AnonymousObservable, AnonymousObserver 및 AnonymousDisposable 클래스를 만드는 것입니다가, (단지 사실을 차선책으로 작동 할 수있는 방법을 보여줍니다 인터페이스를 인스턴스화 할 수 없다는 것). Actions 및 Funcs로 전달하면 0 구현을 포함합니다. 예를 들어

:

public class AnonymousObservable<T> : IObservable<T> 
{ 
    private Func<IObserver<T>, IDisposable> _subscribe; 
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) 
    { 
     _subscribe = subscribe; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     return _subscribe(observer); 
    } 
} 

난 당신이 나머지를 해결 드리겠습니다 ... 그것은 이해에 매우 좋은 운동입니다.

관련 질문이있는 here의 멋진 작은 스레드가 있습니다.

+1

감사하지만 정말 도움이되지 않습니다. 나는 반사경과 대부분의 C9 비디오를 이미 봤다. 리플렉터는 실제 구현만을 보여주고 스레딩과 관련된 규칙을 추론하는 것은 매우 어렵습니다. 또한 당신의 소위 비밀은 실제적인 관찰 가능한 클래스에서 올바른 구현의 책임을 제공된 Func로 푸시합니다. Func를 구현하기위한 규칙을 밝히지 않습니다. 그래서 기본적으로 당신은 나 자신이 나머지를 알아내는 것 외에는 아무것도 말하지 않았습니다. –

+1

Point taken.솔직히 말해서, 지금까지의 대부분의 노력은 실제 소스와는 달리 '결합 자'라고 부르는 것을 작성하려고했습니다. 내 질문에 대한 답변 (여기에서 '공식'답변을 얻는 가장 좋은 장소)에서 몇 가지 지침을 수집 할 수 있습니다. http://social.msdn.microsoft.com/Forums/en-US/rx/thread/79402dd3 -009a-46db-9b55-06482e8cad0e – Benjol

2

는 : 동시 컬렉션은 .NET의 FW 4에 도입 된 후

은 간단한 사전 대신 ConcurrentDictioary를 사용하는 것이 좋을 것입니다.

컬렉션에 처리 잠금을 저장합니다.

adi.

6

예, yield 키워드는 멋지다. 아마도 IObservable (OfT)과 비슷한 것이있을 것입니다. [편집 :.에서 에릭 Meijer에의 PDC '09 talk 그가 관찰 가능한 생성을위한 선언적 수율에 "예,이 공간을보고"말한다]

가 (대신 자신의 압연) 가까운 무언가를 들어, "(not yet) 101 Rx Samples"위키의 the bottom을 확인, 팀은 IObservable (OfT)을 구현하기 위해 Subject (T) 클래스를 "백엔드"로 사용할 것을 제안합니다.

public class Order 
{    
    private DateTime? _paidDate; 

    private readonly Subject<Order> _paidSubj = new Subject<Order>(); 
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } } 

    public void MarkPaid(DateTime paidDate) 
    { 
     _paidDate = paidDate;     
     _paidSubj.OnNext(this); // Raise PAID event 
    } 
} 

private static void Main() 
{ 
    var order = new Order(); 
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe 

    order.MarkPaid(DateTime.Now); 
} 
+0

IMHO'Subject'는 자신의 관찰 대상을 생성하고자 할 때 분명히 적절한 방법입니다. –

+2

AsyncSubject 은 향후 구독자에게 마지막 값을 유지하기 때문에 더 나은 선택입니다. 귀하의 예에서는 실제 유료 이벤트가 발생하기 전에 구독 할 수 있습니다. – Nappy

+0

@ 기저귀 :'AsyncSubject '에 대해 몰랐다. –

11

official documentation이 IObservable 자신을 구현하는 사용자를 deprecates : 여기에 그 예입니다. 대신 사용자는 팩토리 메서드를 사용해야합니다. Observable.Create

가능하면 기존 연산자를 조합하여 새 연산자를 구현하십시오. 그들의 구현을하지 않은 이유를 모르겠어요

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) 
{ 
    if (subscribe == null) 
    { 
     throw new ArgumentNullException("subscribe"); 
    } 
    return new AnonymousObservable<TSource>(subscribe); 
} 

: 그렇지 않으면 Observable.Create은 반응성의 내부 클래스 AnonymousObservable 주변의 사소한 래퍼임을 발생 Observable.Create

를 사용하여 사용자 정의 연산자를 구현 대중들,하지만 어쨌든.

+0

수정. 'IObservable '또는'IObserver '을 직접 구현하지 마십시오. –

+0

안녕하세요. RX에 관한 책, 이전 블로그를 좋아하십시오. 공식 문서보다 훨씬 좋은 안내서입니다. –

+1

건배. Rx가 현재 오픈 소스이므로 팀이 공식 코드/문서를 업데이트 할 수 있도록 노력하겠습니다. –