A가 DB에 항목을 만들고 B가 rabbitMQ에 작업을 대기열에 넣은 다음 관찰 가능 항목 체인 A -> B가 있습니다. 내가하고 싶은 일은 B가 예외를 던지면 (DB에 입력 된 데이터를 삭제/되돌려 야합니다) 세 번째 C가 실행되는 것입니다. 그런 다음 예외가 호출자에게 전달되어야합니다. onErrorResumeNext 사용하고 있지만 명시 적 (비어있는) 구독자가있는 경우에만 작동하는 것 같습니다. 참고 : 아래 코드의 대리자는 A를 반환하고 queueSender는 B를 반환합니다.이 경우 또는 더 좋은 방법이 있습니까?Observable.onErrorResumeNext를 사용하여 실행을 분기하고 예외를 전파하는 방법
public Observable<Long> create(final Message m) {
return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(final Long aLong) {
Observable<Void> observableB = queueSender.observe(aLong)
observableB.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
@Override
public Observable<Void> call(Throwable throwable) {
delegate.delete(aLong).toBlockingObservable().single();
return null;
}
}).subscribe(new VoidSubscriber());
return timelineObservable.map(new Func1<Void, Long>() {
@Override
public Long call(Void aVoid) {
return aLong;
}
});
}
});
}