2014-09-03 1 views
0

A가 DB에 항목을 만들고 B가 rabbitMQ에 작업을 대기열에 넣은 다음 관찰 가능 항목 체인 A -> B가 있습니다. 내가하고 싶은 일은 B가 예외를 던지면 (DB에 입력 된 데이터를 삭제/되돌려 야합니다) 세 번째 C가 실행되는 것입니다. 그런 다음 예외가 호출자에게 전달되어야합니다. onErrorResumeNext 사용하고 있지만 명시 적 (비어있는) 구독자가있는 경우에만 작동하는 것 같습니다. 참고 : 아래 코드의 대리자는 A를 반환하고 queueSender는 B를 반환합니다.이 경우 또는 더 좋은 방법이 있습니까?Observable.onErrorResumeNext를 사용하여 실행을 분기하고 예외를 전파하는 방법

public Observable<Long> create(final Message m) { 
    return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(final Long aLong) { 
      Observable<Void> observableB = queueSender.observe(aLong) 

      observableB.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() { 
       @Override 
       public Observable<Void> call(Throwable throwable) { 
        delegate.delete(aLong).toBlockingObservable().single(); 
        return null; 
       } 
      }).subscribe(new VoidSubscriber()); 

      return timelineObservable.map(new Func1<Void, Long>() { 
       @Override 
       public Long call(Void aVoid) { 
        return aLong; 
       } 
      }); 
     } 
    }); 
} 

답변

0

VoidSubscriber를 제거하는 쉬운 방법은 observableB를 timelineObservable로 압축하는 것입니다. onErrorResumeNext()가 오류를 삼키기 때문에 오류를 전달하기 위해 onErrorResumeNext()를 doOnError()로 변경해야합니다. 수정 된 코드는 다음과 같습니다.

public Observable<Long> createOld(final Message m) { 
    return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(final Long aLong) { 
      final Observable<Void> observableB = 
        queueSender 
          .observe(aLong) 
          .doOnError(
            new Action1<Throwable>() { 
             @Override 
             public void call(Throwable throwable) { 
              delegate.delete(aLong).toBlockingObservable().single(); 
             } 
            } 
          ); 

      final Observable<Long> observableTimeline = 
        timelineObservable 
          .map(new Func1<Void, Long>() { 
           @Override 
           public Long call(Void aVoid) { 
            return aLong; 
           } 
          }); 

      return Observable 
        .zip(
          observableB, 
          observableTimeline, 
          new Func2<Void, Long, Long>() { 
           @Override 
           public Long call(Void aVoid, Long aLong) { 
            return aLong; 
           } 
          } 
        ); 
     } 
    }); 
} 
관련 문제