당신은 정기적으로 n
항목 당을 요청합니다 사용자 정의 가입자와 결합 rx.Observable#onBackpressureBuffer()
를 사용하여 시도 할 수 둘째. 그러나, 당신은 하드 샘플에 바인딩 될 것입니다.
참고.subscribeOn()
및 .toBlocking()
은 주 방법을 즉시 종료하지 못하게하는 것입니다.
public class BackpressureTest {
public static void main(final String[] args) {
Observable.range(1, 1000)
.compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
.lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
.subscribeOn(Schedulers.computation())
.toBlocking()
.subscribe(System.out::println);
}
private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
return upstream -> periodicallyRequestingSubscriber(upstream, n);
}
private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
return new Subscriber<T>() {
@Override
public void onStart() {
request(0); // request 0 so that source stops emitting
Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
}
@Override
public void onCompleted() {
upstream.onCompleted();
}
@Override
public void onError(final Throwable e) {
upstream.onError(e);
}
@Override
public void onNext(final T integer) {
upstream.onNext(integer);
}
};
}
}
'delay' 또는'throttleFirst (throttleLast)'를 원하십니까? 후자는 항목을 너무 빨리 받으면 항목을 삭제합니다. – zsxwing