2016-10-01 2 views
1

windowCount를 사용하여 관찰 가능 값을 그룹으로 그룹화하려고하고 있으며 각 그룹의 각 값에 대해 요청을 보냅니다.
그런 다음 현재 그룹의 요청이 완료되기 전에 다음 그룹의 요청이 시작되지 않도록 해당 그룹을 연결하십시오.
문제는 일부 값을 건너 뜁니다.

여기 내 코드가 있습니다.
(실제 Ajax 호출을하지는 않지만 Observable.timer가 예제로 작동해야 함).windowCount 값을 내림

Observable.interval(300) 
    .take(12) 
    .windowCount(3) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(Math.random() * 1500).mapTo(v) 
     ); 
    }) 
    .do(v => console.log(v)) 
    .finally(() => console.log('fin')) 
    .subscribe(); 

그룹을 수동으로 생성하여 대체하려고했습니다. 그리고 그것은 완벽하게 작동합니다. 값을 건너 뛰지 않습니다.

Observable.interval(900) 
    .take(4) 
    .map(i => Observable.interval(300).take(3).map(j => j + i * 3)) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(Math.random() * 1500).mapTo(v) 
     ); 
    }) 
    .do(v => console.log(v)) 
    .finally(() => console.log('fin')) 
    .subscribe(); 

windowCount가 방출 된 값을 같은 방식으로 그룹화해야한다는 인상을 받았습니다.
하지만 분명히 다른 작업을 수행합니다.

그 동작에 대한 설명에 정말 감사 할 것입니다.

감사합니다.

답변

0

누락 된 값은 사용하기 위해 저장하지 않는 값을 계속해서 출력하는 열 관찰 가능 (Observable.interval(300))을 사용한 결과입니다.

다음은 숫자가 방출되는 시간을 기록하는 코드의 약간 단순화 된 버전입니다. 출력이 결정적이되도록 을 1으로 바꿨습니다.

https://jsbin.com/burocu/edit?js,console

Observable.interval(300) 
    .do(x => console.log(x + ") hot observable at: " + (x * 300 + 300))) 
    .take(12) 
    .windowCount(3) 
    .do(observe3 => {observe3.toArray() 
     .subscribe(x => console.log(x + " do window count at: " + (x[2] * 300 + 300)));}) 
    .concatMap(obs => { 
     return obs.mergeMap(
      v => Observable.timer(1 * 1500).mapTo(v) 
     ) 
     .do(v => console.log(v + " merge map at: " + (v * 300 + 300 + 1500))); 
    }) 
    .finally(() => console.log('fin windowCount')) 
    .subscribe(); 

그것은 아래의 출력 결과 : 당신이 밖으로 시도하는 나는 또한 jsbin의 코드를로드 한. 뜨겁게 관측 된 것들은 다른 작업자들이 여전히 처리되고있는 동안 행진합니다.

이것은 값을 삭제한다고 생각하는 것입니다. windowCount(3)을 수행하고있는 것을 볼 수 있습니다. 이라고 생각하면이라고 생각하면라고 생각했습니다.

"0) hot observable at: 300" 
"1) hot observable at: 600" 
"2) hot observable at: 900" 
"0,1,2 do window count at: 900" 
"3) hot observable at: 1200" 
"4) hot observable at: 1500" 
"5) hot observable at: 1800" 
"3,4,5 do window count at: 1800" 
"0 merge map at: 1800" 
"6) hot observable at: 2100" 
"1 merge map at: 2100" 
"7) hot observable at: 2400" 
"2 merge map at: 2400" 
"8) hot observable at: 2700" 
"6,7,8 do window count at: 2700" 
"9) hot observable at: 3000" 
"10) hot observable at: 3300" 
"11) hot observable at: 3600" 
"9,10,11 do window count at: 3600" 
" do window count at: NaN" 
"8 merge map at: 4200" 
"fin windowCount" 

편집 : 추가 설명 ...

windowCount(3)concatMap에 대한 호출이 있습니다. concatMapmapconcatAll의 조합입니다.

concatAll

:

직렬 방식으로, 소스 (관찰 가능한 고차 )에 의해 방출 된 모든 관찰 가능한 조인. Observable이 이전의 내부 Observable이 완료된 후에 만 ​​각각 Observable 을 구독하고 은 모든 값을 반환 된 관찰 가능 항목에 병합합니다.

위의 결과를 보면 첫 번째 windowCount(3) 값 [0,1,2]은 1800에서 2400 사이에 출력된다는 것을 알 수 있습니다.제 windowCount(3) 값 [3,4,5]가 1800 concatAll에서 방출

공지 [3,4,5]가 방출되는 경우에 가입 할 준비가되지 아직 이전 관찰 가능한 내부가 완료되지 때문이다. 따라서이 값은 효과적으로 삭제됩니다.

다음에, 이전의 관찰 가능한 내부 [0,1,2]가 표시 2400

다음 값 concatAll 구독 2400에서 완료 2700의 값이 8 (300ms를 구독이 시작된 후에 알 2400). 그 다음, 8의 값은 4200에서 mergeMap에 의해 출력되고, 이는 2400의 가입 시작 포인트로부터 300의 간격 지연과 1500 (즉, 2400 + 300 + 1500 = 4200)의 타이머 지연 때문에 4200에서 출력된다.

이 시점 이후에 시퀀스가 ​​완료되므로 더 이상 값이 출력되지 않습니다.

자세한 설명이 필요하면 의견을 추가하십시오.