2016-08-20 3 views
5

나는 이것을 달성하는 것이 다소 쉬워야한다고 생각하지만 문제를 해결하는 방법을 알아내는 데 어려움을 겪고있다.RxJS를 사용하여 일괄 처리 하시겠습니까?

내가 가진 것은 JSON 개체의 배열을 반환하는 API입니다. 이 객체들을 단계별로 조사해야하고, 각 객체에 대해 다른 AJAX 호출을해야합니다. 문제는 각 AJAX 호출을 처리하는 시스템이 한 번에 두 개의 활성 호출 만 처리 할 수 ​​있다는 것입니다 (CPU 사용량이 많은 데스크톱 응용 프로그램으로 연결되는 작업 임).

RxJS (버전 5 또는 4 사용)를 사용하여이 작업을 수행하는 방법이 궁금합니다.

편집 : 또한 동시에 진행하는 체인 체인을 가질 수 있습니까? 작동하지 않는 것 그러나

Rx.Observable.fromPromise(start()) 
    .concatMap(arr => Rx.Observable.from(arr)) 
    .concatMap(x => downloadFile(x)) 
    .concatMap((entry) => processFile(entry)) 
    .concatMap((entry) => convertFile(entry)) 
    .concatMap((entry) => UploadFile(entry)) 
    .subscribe(
     data => console.log('data', new Date().getTime(), data), 
     error => logger.warn('err', error), 
     complete => logger.info('complete') 
    ); 

: 즉

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

내가 좋아하는 일을 시도했습니다. 예를 들어, downloadFile은 processFile, convertFile 및 uploadFile이 모두 완료 될 때까지 기다리지 않고 이전 파일이 완료되는 즉시 다음 파일이 다시 실행됩니다.

+0

http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/subscribe.html – cport1

+0

@ cport1 - 연결된 예제는 비동기 작업 형식을 기다리지 않고 차례로 실행됩니다. 그건 내가 묻는 것과 다르다. – NRaf

답변

4

당신은 내부의 모든 약속을 해결해야합니다,이 접근법이다 단일 concatMap 메소드 (예 :

)
Rx.Observable.fromPromise(getJSONOfAjaxRequests()) 
    .flatMap(function(x) { return x;}) 
    .concatMap(function(item) { 
    return downloadFile(item) 
     .then(processFile) 
     .then(convertFile); 
    }) 
    .subscribe(function(data) { 
    console.log(data); 
    }); 

여기서 작업 plunkr을 참조하십시오. https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview 이렇게하면 새 아약스 호출은 이전 작업이 완료 될 때만 전송됩니다.

다른 접근 방식은 파일이 요청을 병렬로 전송할 수 있지만 '다운로드, 처리, 변환, 업로드'작업은 순서대로 수행됩니다.이를 위해 당신은 plunkr 여기에서 볼

Rx.Observable.fromPromise(getJSONOfAjaxRequests()) 
    .flatMap(function(x) { return x;}) 
    .merge(2) // in case maximum concurrency required is 2 
    .concatMap(function(item) { 
    return downloadFile(item); 
    }) 
    .concatMap(function(item) { 
    return processFile(item); 
    }) 
    .concatMap(function(item) { 
    return convertFile(item) 
    }) 
    .subscribe(function(data) { 
    //console.log(data); 
    }); 

에 의해 작동하게 할 수 있습니다 https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

+0

고마워요, 첫 번째 것은 제가 함께 간 것입니다. 두 번째 것이 이상적 일 것입니다. 병렬 작업의 양 (한 번에 최대 2)을 제한 할 수 있다면. – NRaf

+0

그런데이 첫 번째 솔루션은 rxjs5에서만 작동합니다. 버전 4를 사용하는 동작은 모두 병렬로 실행됩니다. – NRaf

+0

이상한! 이론적으로 동작은 rxjs4 및 rxjs5에서 동일하게 유지되어야합니다. – FarazShuja

1

어때? from을 사용하여 배열을 바이트 크기의 청크로 분리하고 concatMap을 사용하여 하나씩 처리 할 수 ​​있습니다.

function getArr() { 
    return Rx.Observable.of([1, 2, 3, 4, 5, 6, 7, 8]); 
} 


function processElement(element) { 
    return Rx.Observable.of(element) 
     .delay(500); 
} 


getArr() 
    .concatMap(arr => { 
     return Rx.Observable.from(arr); 
    }) 
    .concatMap(element => { 
     return processElement(element); 
    }) 
    .subscribe(res => { 
     console.log(res); 
    }); 
+0

이것은 유망 해 보이지만 그것을 확장 할 수 있습니까? 즉 다른 단계도 추가하고 싶다면? – NRaf

+0

연장 할 수 있다고 생각합니다. 그렇습니다. ConcatMaps를 추가하거나 필요한 다른 연산자를 계속 추가 할 수 있습니다. 여기에 좋은 정보가 많이 있습니다. http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html – jcolemang

2

당신은 maxConcurrency 과부하 (Rxjs의 V4)와 merge 연산자를 사용할 수 있습니다, 그래서 뭔가 같은 :

공식 문서 : 정확히이

Downloading File: 1 
Processing File: 1 
Converting File: 1 
Uploading File: 1 
Downloading File: 2 
Processing File: 2 
... 

같은 요청의 순서를 원하는 경우 여기