2016-10-25 6 views
0

와 rxjava 스트림을 단순화하는 방법은 다음 코드가 : 나는 모든 flatMaptimeout를 추가 한 후 자신을 반복하지 않으제한 시간

Observable.just(1) 
    .flatMap(-> doSomething1) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething2) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething3) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething4) 
    .timeout(10, SECONDS) 
    .subscribe(); 

합니다. 내 첫 번째 생각은 스트림의 시작 부분이나 끝 부분에만 timeout을 적용하는 것이 었습니다.하지만 이는 더 가깝게 관찰 할 수있는 시간 만 적용하기 때문에 의도 한 동작이 아닙니다.

Observable.just(1) 
    .flatMap(-> doSomething1) 
    .flatMap(-> doSomething2) 
    .flatMap(-> doSomething3) 
    .flatMap(-> doSomething4) 
    .timeout(10, SECONDS) 
    .subscribe(); 

Observable.just(1) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething1) 
    .flatMap(-> doSomething2) 
    .flatMap(-> doSomething3) 
    .flatMap(-> doSomething4) 
    .subscribe(); 

doSomethingX 기능

자체가 타임 아웃에 랩 될 필요하지 않는 다음 관찰을 반환하기 전에 시간이 걸릴 수 호출에 약간의 코드를 실행합니다.

어떻게 개선 할 수 있습니까?

업데이트 :

O 아래 실제 예. 이데아는 실패 나 타임 아웃시 재 시도 할 수있는 스트림을 작성하는 것입니다. 시나리오 중 하나가 시간 초과 된 시나리오를 시뮬레이션 중이지만 다시 시도합니다.

@Test 
public void streamToBeSimplified() throws Exception { 
    final AtomicBoolean retry = new AtomicBoolean(true); 

    Action1<Object> print = new Action1<Object>() { 
     @Override 
     public void call(Object o) { 
      System.out.println(" >>>" + o); 
     } 
    }; 

    Observable.just(1) 
      .doOnNext(print) 
      .flatMap(new Func1<Integer, Observable<Integer>>() { 
       @Override 
       public Observable<Integer> call(Integer integer) { 
        return Observable.just(2); 
       } 
      }) 
      .timeout(1, TimeUnit.SECONDS) 
      .doOnNext(print) 
      .flatMap(new Func1<Object, Observable<Integer>>() { 
       @Override 
       public Observable<Integer> call(Object o) { 

        if(retry.getAndSet(false)) { 
         try { 
          Thread.sleep(2000L); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        return Observable.just(3); 
       } 
      }) 
      .timeout(1, TimeUnit.SECONDS) 
      .doOnNext(print) 
      .retry(2) 
      .subscribe(); 

} 
+0

가 doSomethingX)는 관찰 가능한 또는 doSomethingX으로하는 방법 - 전화 (? –

+0

더 좋은 예가 추가되었습니다 –

답변

1

당신은이 같은 도우미 메서드를 만들 수 있습니다

private Observable doThings() { 
    return Observable.just(1) 
     .flatMap(__ -> withTimeout(doSomething1, 10, TimeUnit.SECONDS)) 
     .flatMap(__ -> withTimeout(doSomething2, 10, TimeUnit.SECONDS)); 
     // etc 
} 

private static <T> Observable<T> withTimeout(Observable<T> observable, long time, TimeUnit timeUnit) { 
    return observable 
      .timeout(time, timeUnit); 
} 
+0

더 좋은 예가 추가되었습니다. 'flatMap'은 실제로'Func1'을받습니다. 따라서'Observable'을 감싸는 것이 없습니다. 아마 내가이 작곡을 생각하는 방식을 전환해야 할 것입니다. –