2013-11-22 1 views
1

RX에 가장 적합하다고 생각되는 레거시 이벤트 기반 객체가 있습니다. 네트워크 소스에 연결 한 후 메시지 수신시 이벤트를 발생시키고 단일 오류로 종료 될 수 있습니다 (연결 금형 등) 또는 더 이상 메시지가 없다는 표시 (드물게) 이 객체에는 두 개의 예측이 있습니다. 대부분의 사용자는 수신 된 메시지의 하위 집합에만 관심이 있으므로 잘 알려진 메시지 하위 유형이 나타날 때만 발생하는 대체 이벤트가 있습니다. 이 떨어져 ... 어떻게 든,하지만 ... 느낌이 다른 곳에서 사용되는 방법에 대한IConnectableObservable에서 레거시 객체 래핑

class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage> 
{ 
    private LegacyType _Legacy; 
    private IConnectableObservable<TopLevelMessage> _Impl; 
    public LegacyReactiveWrapper(LegacyType t) 
    { 
     _Legacy = t; 
     var observable = Observable.Create<TopLevelMessage>((observer) => 
     { 
      LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm); 
      LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message)); 
      LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted(); 

      _Legacy.TopLevelMessage += tlmHandler; 
      _Legacy.Error += errHandler; 
      _Legacy.Complete += doneHandler; 

      return Disposable.Create(() => 
      { 
       _Legacy.TopLevelMessage -= tlmHandler; 
       _Legacy.Error -= errHandler; 
       _Legacy.Complete -= doneHandler; 
      }); 
     }); 

     _Impl = observable.Publish(); 
    } 

    public IDisposable Subscribe(IObserver<TopLevelMessage> observer) 
    { 
     return _Impl.RefCount().Subscribe(observer); 
    } 

    public IDisposable Connect() 
    { 
     _Legacy.ConnectToMessageSource(); 
     return Disposable.Create(() => _Legacy.DisconnectFromMessageSource()); 
    } 

    public IObservable<SubMessageA> MessageA 
    { 
     get 
     { 
      // This is the moral equivalent of the projection behavior 
      // that already exists in the legacy type. We don't hook 
      // the LegacyType.MessageA event directly. 
      return _Impl.RefCount() 
        .Where((tlm) => tlm.MessageType == MessageType.MessageA) 
        .Select((tlm) => tlm.SubMessageA); 
     } 
    } 

    public IObservable<SubMessageB> MessageB 
    { 
     get 
     { 
      return _Impl.RefCount() 
        .Where((tlm) => tlm.MessageType == MessageType.MessageB) 
        .Select((tlm) => tlm.SubMessageB); 
     } 
    } 
} 

뭔가 :

그래서, 반응 프로그램에 대한 자세한 내용을 학습하는 과정에서, 나는 다음과 같은 래퍼를 만들었습니다. 작동하지만 이상하게 느껴지는 샘플 사용법은 다음과 같습니다. 내 테스트 응용 프로그램의 UI 컨텍스트는 WinForms이지만 실제로 중요하지는 않습니다.

// in Program.Main... 

MainForm frm = new MainForm(); 

// Updates the UI based on a stream of SubMessageA's 
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm); 

LegacyType lt = new LegacyType(); 
// ... setup lt... 

var w = new LegacyReactiveWrapper(lt); 

var uiUpdateSubscription = (from msgA in w.MessageA 
          where SomeCondition(msgA) 
          select msgA).ObserveOn(frm).Subscribe(uiManager); 

var nonUiSubscription = (from msgB in w.MessageB 
         where msgB.SubType == MessageBType.SomeSubType 
         select msgB).Subscribe(
          m => Console.WriteLine("Got MsgB: {0}", m), 
          ex => Console.WriteLine("MsgB error: {0}", ex.Message), 
          () => Console.WriteLine("MsgB complete") 
         ); 

IDisposable unsubscribeAtExit = null; 
frm.Load += (sender, e) => 
{ 
    var connectionSubscription = w.Connect(); 
    unsubscribeAtExit = new CompositeDisposable(
           uiUpdateSubscription, 
           nonUiSubscription, 
           connectionSubscription); 
}; 

frm.FormClosing += (sender, e) => 
{ 
    if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); } 
}; 


Application.Run(frm); 

이 작품 - 양식 출시의 UI 업데이트, 내가 그것을 닫을 때 구독 청소 얻을합니다 (LegacyType의 네트워크 연결이 연결되어있는 경우는하지 않을 것이다) 프로세스가 종료됩니다. 엄밀히 말하면, 단지 connectionSubscription을 처리하면 충분합니다. 그러나 명시적인 Connect은 나에게 이상하다고 느낀다. RefCount가 당신을 위해 어떻게해야되기 때문에, 차라리 MessageAMessageB_Impl.RefCount를 사용하여 명시 적으로 나중에 연결하는 것보다, 내가 대신 this.RefCount을 사용하고 Load 핸들러에 Subscribe로 통화를 이동하는 래퍼는 수정했습니다. 다른 문제가있었습니다. 두 번째 구독으로 LegacyReactiveWrapper.Connect이라는 또 다른 호출이 시작되었습니다. 나는 Publish/RefCount 뒤에있는 아이디어가 "선입 선출 연결, 최후의 연결을 처리합니다."라고 생각했습니다.

  1. 나는 근본적으로 Publish/RefCount 오해 마십시오

    는 내가 세 가지 질문을 가지고 추측?

  2. 을 구현하는 데 선호되는 방법은 IObservable<T>.Publish을 통해 얻은 위임과 관련이 없습니다. 나는 IObservable<T> 자신을 구현하기로되어 있지 않다는 것을 알고 있지만, Observable.Create().Publish()이 제공하는 IConnectableObservable<T>에 연결 논리를 주입하는 방법을 이해하지 못합니다. Connect은 멱등수입니까?
  3. RX/반응 형 프로그래밍에 익숙한 사람이라면 래퍼 사용법에 대한 샘플을보고 "그게 못 생겼고 고장났다"고 말하는가? 아니면 이상하게 보이지 않을까?

답변

2

나는 당신이 직접 연결을 노출해야하는지 확신하지 못합니다. 나는 캡슐화 된 구현 세부 사항으로 Publish().RefCount()을 사용하여 다음과 같이 단순화 할 것이다. 레거시 연결은 필요한 경우에만 수행됩니다. 여기서 첫 번째 가입자가 연결을 끊고 마지막 가입자가 연결을 끊습니다. 또한 모든 가입자간에 RefCount을 올바르게 공유하는 반면 메시지 유형별로 RefCount을 사용한다는 점에 유의하십시오. 이는 의도 한 바가 아닙니다. 사용자가 명시 적으로 연결 할 필요가 없습니다 :

public class LegacyReactiveWrapper 
{ 
    private IObservable<TopLevelMessage> _legacyRx; 

    public LegacyReactiveWrapper(LegacyType legacy) 
    { 
     _legacyRx = WrapLegacy(legacy).Publish().RefCount(); 
    } 

    private static IObservable<TopLevelMessage> WrapLegacy(LegacyType legacy) 
    { 
     return Observable.Create<TopLevelMessage>(observer => 
     { 
      LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm); 
      LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message)); 
      LegacyCompleteHandler doneHandler = sender => observer.OnCompleted(); 

      legacy.TopLevelMessage += tlmHandler; 
      legacy.Error += errHandler; 
      legacy.Complete += doneHandler; 
      legacy.ConnectToMessageSource(); 

      return Disposable.Create(() => 
      { 
       legacy.DisconnectFromMessageSource(); 
       legacy.TopLevelMessage -= tlmHandler; 
       legacy.Error -= errHandler; 
       legacy.Complete -= doneHandler; 
      }); 
     }); 
    } 

    public IObservable<TopLevelMessage> TopLevelMessage 
    { 
     get 
     { 
      return _legacyRx; 
     } 
    } 

    public IObservable<SubMessageA> MessageA 
    { 
     get 
     { 
      return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageA) 
          .Select(tlm => tlm.SubMessageA); 
     } 
    } 

    public IObservable<SubMessageB> MessageB 
    { 
     get 
     { 
      return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageB) 
          .Select(tlm => tlm.SubMessageB); 
     } 
    } 
} 

추가 관찰이 가입자 수 때 Publish().RefCount()가 나는 경우에도 가입을 유지해야 할 때 일반적으로 나는 단지이 선택을 통해 Connect을 사용하여 0에 도달 기본 가입을 드롭하는 것입니다 게시 된 소스의 구독자 수가 0이됩니다 (나중에 다시 백업 될 수 있음). 드물지만 연결을 할 필요가 없을 때 연결 자원을 유지하는 것보다 연결이 더 많이 필요할 때만이 작업을 수행해야합니다.

+0

구현 세부 사항으로'Publish(). RefCount()'에 +1. 너무 많은 시간이 지나면 기본적으로이 작업을 수행하지 않는 이벤트 래퍼가 표시됩니다. Rx가 구독을 관리하고 있으므로 중복 이벤트 처리기를 기존 이벤트에 추가하여 중복되지 않도록하십시오! :) –

1
  1. 이해 완전히 잘못되지 않습니다,하지만 당신은 오해의 몇 가지 포인트를 가지고있는 것 같습니다 않습니다.

    동일한 소스 IObservable에서 RefCount을 여러 번 호출하면 공유 참조 횟수가 발생한다는 신념하에있는 것으로 보입니다. 그들은하지 않습니다; 각 인스턴스는 자체 카운트를 유지합니다. 따라서 _Impl에 대한 여러 구독 (호출 당 하나씩 구독 또는 메시지 속성 호출)이 발생합니다.

    당신은 또한 (당신이 당신의 소모 코드에서 연결을 호출 할 필요가 놀랄 것 때문에) _Impl를 만드는 것은 IConnectableObservable이 어떻게 든 Connect 메소드가 호출되도록 것으로 예상 할 수있다. 모두 Publish은 게시 된 개체의 구독자가됩니다 (.Publish() 호출)을 사용하여 원본 소스 observable (이 경우 Observable.Create 호출로 만든 객체)에 대한 단일 구독을 공유합니다.

    일반적으로 위에서 설명한 공유 구독 효과를 얻으려면 Publish 및 RefCount를 즉시 함께 사용합니다 (예 : source.Publish().RefCount()). 연결을 호출하여 원본 소스에 대한 구독을 시작하지 않고도 추운 관측자를 뜨겁게 만들 수 있습니다. 그러나 이것은 위에서 언급 한 것처럼 모든 구독자에 대해 .Publish(). RefCount()에서 반환 된 동일한 개체를 사용하는 데 의존합니다.

  2. Connect의 구현이 합리적인 것처럼 보입니다. Connect가 멱등수 (Idempotent) 여야한다는 것에 대한 권장 사항은 모르지만 개인적으로 기대하지는 않습니다. 원한다면 올바른 밸런스를 얻기 위해 반환 값을 처리하는 호출을 추적해야합니다.

    레거시 개체에 여러 이벤트 처리기가 연결되지 않도록해야하는 이유가없는 한 게시 방법을 사용할 필요가 없습니다. 이를 피할 필요가 있다면 _Impl을 일반 IObservable으로 변경하고 PublishRefCount으로 따르는 것이 좋습니다.

  3. MessageAMessageB 속성은 IObservable을 반환하지만 메시지 수신을 시작하기 위해 기본 개체에서 Connect를 호출해야하므로 사용자에게 혼동의 원천이 될 수 있습니다. 필자는 IConnectableObservables로 변경하여 원래 연결 (어떤 시점에서 멱시성 논의가 더 관련성이 높아짐)에 위임하거나 속성을 삭제하고 필요할 때 사용자가 (매우 간단한) 투영을 스스로하도록 허용합니다.

+0

RefCount가 어떻게 작동하는지 오해 한 것이 핵심 문제였습니다. Idempotence의 문제는 실제로 그것이 어떻게 작동 했는가보다는 그것이 효과가 있다고 생각한 방식으로 작동한다면 필요한 해결 방법과 관련이 있습니다. 유효한 포인트가 3) 있습니다. Publish()와의 연결 공유가 어떻게 작동하는지 그리고 같은 스트림에 올라가고 있음에도 불구하고 "A 만"또는 "B 만"을 염려하는 코드가 많기 때문에 두 가지를 모두 포함 시켰습니다. – twon33

관련 문제