2014-11-14 4 views
6

데이터베이스 커서에서 빠른 스트림으로부터 데이터를 생성하는 관찰 가능 기능이 있습니다. 초당 x 항목의 속도로 출력을 조절하려고합니다. 이것은 잘 작동속도 제한 관찰 가능

observable.map(f -> { 
ratelimiter.acquire(); // configured limiter to only allow 
}); 

하지만 그냥 사용하여 배압을 처리하는 더 나은 방법이 호기심 : 지금까지 나는이 문서에 설명 된대로 호출 스택 차단을 사용하고있어?

TKS

+0

'delay' 또는'throttleFirst (throttleLast)'를 원하십니까? 후자는 항목을 너무 빨리 받으면 항목을 삭제합니다. – zsxwing

답변

2

당신은 정기적으로 n 항목 당을 요청합니다 사용자 정의 가입자와 결합 rx.Observable#onBackpressureBuffer()를 사용하여 시도 할 수 둘째. 그러나, 당신은 하드 샘플에 바인딩 될 것입니다.

참고.subscribeOn().toBlocking()은 주 방법을 즉시 종료하지 못하게하는 것입니다.

public class BackpressureTest { 

    public static void main(final String[] args) { 
    Observable.range(1, 1000) 
     .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it 
     .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second 
     .subscribeOn(Schedulers.computation()) 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 

    private static <T> Observable.Operator<T, T> allowPerSecond(final int n) { 
    return upstream -> periodicallyRequestingSubscriber(upstream, n); 
    } 

    private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) { 
    return new Subscriber<T>() { 

     @Override 
     public void onStart() { 
     request(0); // request 0 so that source stops emitting 
     Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items 
     } 

     @Override 
     public void onCompleted() { 
     upstream.onCompleted(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
     upstream.onError(e); 
     } 

     @Override 
     public void onNext(final T integer) { 
     upstream.onNext(integer); 
     } 
    }; 
    } 
} 
0

@michalsamek의 답변은 옳았지만 배압은 Flowables에서만 작동합니다. 구독자를 수정하여 요청한 내용을 처리합니다.

다른 시간에 버스트 (bursts)로 사용할 때 약간의 문제가있었습니다.

private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { 
    return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); 
} 


Observable.range(1, 100) 
    .observeOn(Schedulers.io()) 
    .toFlowable(BackpressureStrategy.BUFFER) 
    .compose(Flowable::onBackpressureBuffer) 
    .lift(allowPerMillis(200)) 
    .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); 



public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { 

    private final Subscriber<T> upstream; 

    private final int millis; 

    // If there hasn't been a request for a long time, do not flood 
    private final AtomicBoolean shouldRequest = new AtomicBoolean(true); 

    public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) { 
     this.upstream = upstream; 
     this.millis = millis; 
    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     Observable 
       .interval(millis, TimeUnit.MILLISECONDS) 
       .subscribe(x -> { 
        if (shouldRequest.getAndSet(false)) 
         subscription.request(1); 
       }); 
} 

@Override 
public void onNext(T t) { 
    shouldRequest.set(true); 
    upstream.onNext(t); 
} 

@Override 
public void onError(Throwable throwable) { 
    upstream.onError(throwable); 
} 

@Override 
public void onComplete() { 
    upstream.onComplete(); 
} 
}