2016-08-29 7 views
2

서버에 대한 호출을 maxEntries의 최대 값으로 묶으려고하지만 maxWait ms보다 오래 기다리지 않으려 고합니다. 이전에는 RxJS 4에서 windowWithTimeOrCount()으로 사용할 수 있었지만 RxJS 5에서는 제거되었습니다.RxJS - maxWait 및 maxElements 창에 windowWhen() 사용

윈도우의 마지막 요소가 손실된다는 것을 제외하면 모든 것이 잘 작동합니다. 그리고 '잃어버린'말하기 - 그건 내가 지금 어떻게 느끼는지입니다. 밖에있는 RxJS 전문가가 내가 잘못한 것을 말해 줄 수 있니? (되지 않은 창) 손실 toggleSubject.next(null)if (count > (maxEntries))로 인해 트리거

private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> { 

    // We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries 
    // but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that 
    // complete after maxEntries/maxWait (whatever comes first). 
    const toggleSubject = new Subject<void>(); 

    return queue 

    // Start emitting a new Observable every time toggleSubject emits. 
    // (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the 
    // complete collection) 
     .windowWhen(() => toggleSubject) 

     // map() is called once for every window (maxEntries/maxWait) 
     // the inner do() is called for every element in the window, allowing us to set up the timeout callback and to 
     // count all elements, then emitting on toggleSubject, triggering a new Observable. 
     // (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed 
     // and the hooked up version with the inner do() would never be called.) 
     .map((obs) => { 
     // counts the number of cacheEntries already in this stream 
     let count = 0; 
     // flag to kill the timeout callback 
     let done = false; 
     // we have to return an Observable 
     return obs.do(() => { 
      count++; 
      if (count === 1) { 
       // we start counting when the first element is streamed. 
       IntervalObservable.create(maxWait).first().subscribe(() => { 
       if (!done) { 
        //trigger due to maxWait 
        toggleSubject.next(null); 
       } 
       }); 
      } 
      if (count > (maxEntries)) { 
       done = true; 
       // trigger due due to maxEntries(' + maxEntries + ')'); 
       toggleSubject.next(null); 
      } 
      } 
     ); 
     }); 
    } 

요소입니다.

EDIT : maxTime은 새로운 Observable의 첫 번째 요소가 푸시되는 순간을 시작합니다. if (count === 1). 이것은 a) 창문이있는 Observables의 내부에서 map()으로 작업하는 이유이며 b)는 필수 동작이므로 중요합니다.

예 : maxElements : 100, maxWait : 100. 101 요소는 t = 99에서 푸시됩니다. 예상되는 행동 : t = 99 일 때, 100 개의 요소가있는 관찰 가능 (Observable)이 푸시됩니다. 1 요소가 남아 있습니다. 카운터 + 타이머 재설정. t = 199에서 두 번째 '청크'에 대한 카운터가 만료되고 Observable을 1 요소로 푸시합니다.

(여기서는 Brandons (것이다 않음) 코드를 볼 - I 올바르게 읽으면 - 하나 개의 원소와, t = 100, 100 개 원소 이상한 MS와 t = 99 피 감시를 피 감시를 누른다.)

답변

2

그래, 이런 부작용에 map을 사용하고 싶지는 않습니다. 너가주의 했는 것처럼, 너는 품목을 떨어 뜨리기 끝낸다.

다음은 내가 원하는대로 할 수있는 일반적인 방법입니다.

참고 : RXJS 5에는 현재이 오버로드에 대한 유형 정의가있는 issue가 있습니다. TypeScript에서 컴파일 할 수 있도록 몇 가지 타입 변환을 추가했습니다.

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> { 
    // use publish() so that we can subscribe multiple times to the same stream of data. 
    return queue.publish(entries => { 
     // observable which will trigger after maxWait 
     const timer = IntervalObservable.create(maxWait); 
     // observable which will trigger after maxEntries 
     const limit = entries.take(maxEntries).last(); 
     // observable which will trigger on either condition 
     const endOfWindow = limit.takeUntil(timer); 

     // use endOfWindow to close each window. 
     return entries.windowWhen(() => endOfWindow) as Observable<T>; 
    }) as Observable<Observable<T>>; 
} 

편집 : 당신은 타이머가 첫 번째 항목은 각 창에 도착할 때까지 시작하지 않으려면

, 당신은 지금처럼 작업을 수행 할 수 있습니다

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> { 
    // use publish() so that we can subscribe multiple times to the same stream of data. 
    return queue.publish(entries => { 
     // observable which will trigger after maxWait after the first 
     // item in this window arrives: 
     const timer = entries.take(1).delay(maxWait); 
     // observable which will trigger after maxEntries 
     const limit = entries.take(maxEntries).last(); 
     // observable which will trigger on either condition 
     const endOfWindow = limit.takeUntil(timer); 

     // use endOfWindow to close each window. 
     return entries.windowWhen(() => endOfWindow) as Observable<T>; 
    }) as Observable<Observable<T>>; 
} 
+0

차갑다. 감사. 좋은 생각이 드는 군 .- 나는 내일 이걸 시도 할거야. 내 접근 방식은 좀 더 복잡해 ... – RAlfoeldi

+0

음 ... 좋아 보이지만 트릭을하지는 마라. 'publish() Observable 을 반환합니다. Observable >이 반환됩니다. 내가 코드를 읽는 방식으로 타이머가 독립적으로 트리거합니다 (스트림에서 발생하는 상황에 관계없이 모든 maxWait ms). 이것이 내가 windowWhen()의 내부에서 피드백을 얻으려는 이유입니다. Observable - 처음 요소가 도착하면 계산을 시작합니다. (편집을 참조하십시오.) – RAlfoeldi

+0

이 오버로드는'publish (func : (Observable ) => Observable )입니다 : Observable '입니다. 다른 말로하면, publish()는 여러분이 제공하는 팩토리 메소드가 리턴하는 관찰 타입의 타입을 반환한다. 이 경우 Observable >을 반환합니다. – Brandon

0

솔루션을 비동기 스케줄러에서 windowWhen()을 토글하는 방법을 생각해 냈습니다. 마지막 값을받는 다운 스트림 연산자를 방지 -

if (count === (maxEntries)) { 
    done = true; 
    this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')'); 
    Rx.Scheduler.async.schedule(()=>toggleSubject.next(null)); 
} 

문제

windowWhen() 즉시 반환 Observables은 완료이었다.

죄송합니다. 질문에 답변 해 주셨습니다. 여기에 게시하기 전에 Rx.Scheduler.async 등을 시도했지만 어떻게 든 작동하지 않는 것 같습니다.

+0

비동기 호출이 실행되기 전에 항목이 모두 관찰 가능으로 푸시되고 있기 때문에 윈도우가 'maxEntries'항목 이상으로 끝날 수 있습니다. 이와 같은 작문 작업과 관련된 복잡성은 주제를 사용하지 않고 기존 운영자로부터 운영자를 구성하는 일반적인 조언 인 이유 중 하나입니다. – Brandon