8

저는 Rx.NET에 다소 익숙합니다. 구독자가 던져 낼 수있는 예외를 잡을 수 있습니까? 다음을 가져 가십시오 ...Subscription OnNext 액션에서 던질 수있는 예외 캐치

handler.FooStream.Subscribe(
      _ => throw new Exception("Bar"), 
      _ => { }); 

현재 다음 인스턴스를 사용하여 구독 단위로 파악하고 있습니다. 구현은 대기 스레드를 깨우기 위해 ManualResetEvent를 사용합니다. 더 좋은 방법이있을 것처럼

public interface IExceptionCatcher 
{ 
    Action<T> Exec<T>(Action<T> action); 
} 

등처럼 사용 ...

handler.FooStream.Subscribe(
      _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred 
      _ => { }); 

는 느낌. 오류 관찰을 다루기 위해 Rx.NET의 모든 오류 처리 기능이 있습니까?

편집 : 요청에 따라 구현은 https://gist.github.com/1409829 (인터페이스 및 구현은 제품 코드에서 다른 어셈블리로 구분됨)입니다. 피드백을 환영합니다. 어리석은 것처럼 보일지 모르지만 성 윈저를 사용하여 다양한 Rx 구독자를 관리하고 있습니다.

var exceptionCatcher = 
    new ExceptionCatcher(e => 
           { 
            Logger.FatalException(
             "Exception caught, shutting down.", e); 
            // Deal with unmanaged resources here 
           }, false); 


/* 
* Normally the code below exists in some class managed by an IoC container. 
* 'catcher' would be provided by the container. 
*/ 
observable /* do some filtering, selecting, grouping etc */ 
    .SubscribeWithExceptionCatching(processItems, catcher); 

답변

8

내장 관찰 가능한 사업자 당신이 일을하지 않습니다이 예외 포수는 observable가 IObservable의 인스턴스입니다 그것은 다음과 같이 사용되는이

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher)); 

같은 컨테이너에 등록 기본적으로 요청 (이벤트와 비슷 함)하지만이를 수행 할 확장 방법을 만들 수 있습니다.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
           this IObservable<T> source 
           ) where TException : Exception 
{ 
    return Observable.CreateWithDisposable<T>(
     o => source.Subscribe(
      v => { try { o.OnNext(v); } 
        catch (TException) { } 
      }, 
      ex => o.OnError(ex), 
      () => o.OnCompleted() 
      )); 
} 

관찰 가능한 모든 요소를이 메서드로 래핑하면 설명 된 동작을 얻을 수 있습니다.

+0

감사합니다. 내 질문에 답변했지만 OnNext를 둘러보고 예외를 잡을 수 있습니까? 반환 된 IObservable을 사용하면 가입 된 코드가 다른 스레드에서 실행되는 것을 매우 쉽게 처리 할 수 ​​있습니다. 원래는 Subject.OnNext 호출을 제외하고 try/catch를 시도했지만 예외는 발견되지 않았습니다. 그러나 SubscribeWithExceptionHandling 메서드 또는 뭔가를 만들 수 있습니다. – drstevens

+1

@drstevens 그것은 같은 스레드에서 예외를 잡을 것입니다. 관찰자가 예외를 던지는 자체 비동기 작업을 시작하면이 작업을 포착하지 않습니다. –

+1

얼마나 많은 Rx 연산 결과가 새로운 스레드 (또는 풀의 작업)가되는지 생각해 보면, 그럴 가능성이 매우 높다고 말할 수 있습니다. 문제의'handler.FooStream'와'Subscribe' 사이에는'GroupByUntil (...). SelectMany (...). Buffer (시간 함께)'가 있습니다. 실제로 Exception을 잡아 내고 OnError 핸들러에 전달 된 것과 동일한 액션을 사용하는 예제에 따라 SubscribeWithCatch를 작성했습니다. – drstevens