서버에 대한 호출을 maxEntries의 최대 값으로 묶으려고하지만 maxWait ms보다 오래 기다리지 않으려 고합니다. 이전에는 RxJS 4에서 windowWithTimeOrCount()
으로 사용할 수 있었지만 RxJS 5에서는 제거되었습니다.RxJS - maxWait 및 maxElements 창에 windowWhen() 사용
윈도우의 마지막 요소가 손실된다는 것을 제외하면 모든 것이 잘 작동합니다. 그리고 '잃어버린'말하기 - 그건 내가 지금 어떻게 느끼는지입니다. 밖에있는 RxJS 전문가가 내가 잘못한 것을 말해 줄 수 있니? (되지 않은 창) 손실 toggleSubject.next(null)
if (count > (maxEntries))
로 인해 트리거
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries/maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
요소입니다.
EDIT : maxTime은 새로운 Observable의 첫 번째 요소가 푸시되는 순간을 시작합니다. if (count === 1)
. 이것은 a) 창문이있는 Observables의 내부에서 map()
으로 작업하는 이유이며 b)는 필수 동작이므로 중요합니다.
예 : maxElements : 100, maxWait : 100. 101 요소는 t = 99에서 푸시됩니다. 예상되는 행동 : t = 99 일 때, 100 개의 요소가있는 관찰 가능 (Observable)이 푸시됩니다. 1 요소가 남아 있습니다. 카운터 + 타이머 재설정. t = 199에서 두 번째 '청크'에 대한 카운터가 만료되고 Observable을 1 요소로 푸시합니다.
(여기서는 Brandons (것이다 않음) 코드를 볼 - I 올바르게 읽으면 - 하나 개의 원소와, t = 100, 100 개 원소 이상한 MS와 t = 99 피 감시를 피 감시를 누른다.)
차갑다. 감사. 좋은 생각이 드는 군 .- 나는 내일 이걸 시도 할거야. 내 접근 방식은 좀 더 복잡해 ... – RAlfoeldi
음 ... 좋아 보이지만 트릭을하지는 마라. 'publish() Observable을 반환합니다. Observable >이 반환됩니다. 내가 코드를 읽는 방식으로 타이머가 독립적으로 트리거합니다 (스트림에서 발생하는 상황에 관계없이 모든 maxWait ms). 이것이 내가 windowWhen()의 내부에서 피드백을 얻으려는 이유입니다. Observable - 처음 요소가 도착하면 계산을 시작합니다. (편집을 참조하십시오.) –
RAlfoeldi
이 오버로드는'publish (func : (Observable) => Observable )입니다 : Observable '입니다. 다른 말로하면, publish()는 여러분이 제공하는 팩토리 메소드가 리턴하는 관찰 타입의 타입을 반환한다. 이 경우 Observable >을 반환합니다. –
Brandon