2015-01-26 3 views
3

나는 약간의 시간이 소요되는 네트워크를하지 실행 방법은 IO 스레드에서 호출이RxJava 배압 (빠른 생산 속도가 느린 소비자)

예를 들어 다음

/** 
* network call 
* @param value 
* @return 
*/ 
private Observable<Integer> execute(final int value) { 
    return Observable.create(new Observable.OnSubscribe<Integer>() { 
     @Override 
     public void call(Subscriber<? super Integer> subscriber) { 

      try { 
       Thread.sleep(500); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      System.out.println("done " + value); 
      subscriber.onNext(value); 
      subscriber.onCompleted(); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 

전에서 실행해야합니다 "명령"의 목록을 가지고 주문. (잇달아)는

started 0 
started 1 
started 2 
started 3 
started 4 
done 0 
done 1 
done 2 
done 4 
done 3 

Produses 빠른

을 소모보다 내고 이러한 방식

public List<Integer> testObservableBackpressure(){ 
    return Observable.range(0,5).flatMap(new Func1<Integer, Observable<Integer>>() { 
     @Override 
     public Observable<Integer> call(Integer integer) { 
      System.out.println("started " + integer); 
      return exeute(integer); 
     } 
    }).toList().toBlocking().single(); 
} 

이다 (Observable.range은 (X는 Y) 명령의리스트를 나타낸다)

내가 원하는

started 0 
done 0 
started 1 
done 1 
started 2 
done 2 
... 
같은 결과 17,451,515,

하지만 ..

public List<Integer> testObservableBackpressure(){ 

    return Observable.create(new Observable.OnSubscribe<Integer>() { 
     @Override 
     public void call(final Subscriber<? super Integer> subscriber) { 
      Observable.range(0,5).subscribe(new Subscriber<Integer>() { 

       @Override 
       public void onStart() { 
        request(1); 
       } 

       @Override 
       public void onCompleted() { 
        subscriber.onCompleted(); 
       } 

       @Override 
       public void onError(Throwable e) { 
        subscriber.onError(e); 
       } 

       @Override 
       public void onNext(Integer integer) { 
        System.out.println("started " + integer); 
        execute(integer).subscribe(new Action1<Integer>() { 
         @Override 
         public void call(Integer integer) { 
          subscriber.onNext(integer); 
          request(1); 
         } 
        }); 
       } 
      }); 
     } 
    }).toList().toBlocking().single(); 
} 

예상대로 결과가 이런 식으로

started 0 
done 0 
started 1 
done 1 
started 2 
done 2 
started 3 
done 3 
started 4 

이 문제를 처리하기 위해 다른 더 우아한 방법이 있는지 내 질문이 될 것이다? 난 당신의 코드가 실행될 때 얻을

답변

1

여기서 구체적인 백 프레셔 전략이 필요한 것은 확실하지 않습니다. 그냥 concatMap을 사용하십시오.

flatMap 대신 concatMap을 사용하는 경우 concatMap에서 방출 된 마지막 Observable이 완료되면 각각의 새로운 입력 값이 구독됩니다. 후드 아래에서 concatMapSerialSubscription을 사용합니다. 그게 당신에게 당신이 원하는 주문을 줄 것입니다.

+0

Perfect! 감사. 결과 예상대로 – Palaima

1

출력 다음 "완료"메시지 순서가 있음을

started 0 
started 1 
started 2 
started 3 
started 4 
done 1 
done 3 
done 4 
done 2 
done 0 

알 수 있습니다. 이는 코드가 기본적으로 각 호출의 실행을 execute과 병렬 처리하기 때문입니다. Observable.range에 의해 방출되는 각 항목에 대해 flatMapObservableIOScheduler으로 실행됩니다. 따라서 모든 항목은 별도의 스레드에서 병렬로 처리되므로 항목을 순서대로 유지하고 올바르게 삽입 할 수 없습니다. 당신이 subscribeOn 연산자를 호출 할 경우 유일한 차이점은

import rx.Observable; 
import rx.Subscriber; 
import rx.functions.Func1; 
import rx.schedulers.Schedulers; 

import java.util.List; 

public class Test { 
    private Observable<Integer> execute(final int value) { 
     return Observable.create(new Observable.OnSubscribe<Integer>() { 
      @Override 
      public void call(Subscriber<? super Integer> subscriber) { 

       try { 
        Thread.sleep(500); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       System.out.println("done " + value); 
       subscriber.onNext(value); 
       subscriber.onCompleted(); 
      } 
     }); 
    } 

    public List<Integer> testObservableBackpressure(){ 
     return Observable.range(0, 5).flatMap(new Func1<Integer, Observable<Integer>>() { 
      @Override 
      public Observable<Integer> call(Integer integer) { 
       System.out.println("started " + integer); 
       return execute(integer); 
      } 
     }).subscribeOn(Schedulers.io()).toList().toBlocking().single(); 
    } 

    public static void main(String[] args) { 
     new Test().testObservableBackpressure(); 
    } 
} 

공지 것을 : 원하는 동작을 달성하기위한 하나의 옵션은 모든 항목 (대신 자체에 각 항목의) ​​같은 IOScheduler에서 실행 있는지 확인하는 것입니다. 이 코드는 다음과 같은 결과를 출력합니다.

started 0 
done 0 
started 1 
done 1 
started 2 
done 2 
started 3 
done 3 
started 4 
done 4 
+0

답변을 주셔서 감사합니다. 해결책 중 하나입니다. 하지만 대부분 1 명령을 실행해야하므로 보통은 명령 (execute) 만 호출하므로 io 스레드에서 실행해야하지만 경우에 따라 명령 목록을 실행하고 실행을 동기화해야합니다. 해결책은 좋지만 모든 실행 (명령)을 추가해야합니다 .subscribeOn (Schedulers.io()) – Palaima

관련 문제