2017-01-08 1 views
3

process.stdin 또는 fs.createReadStream과 같은 노드 js 스트림이있는 경우 RxJs5를 사용하여이 스트림을 RxJs Observable 스트림으로 변환하려면 어떻게해야합니까?노드가 읽을 수있는 스트림을 RX 관찰 가능으로 변환하는 방법

나는 RxJs-NodefromReadableStream 메서드가 있지만 1 년 내에 업데이트되지 않은 것 같습니다.

+0

그래도 작동합니까? 누가 작동하는지 얼마나 자주 업데이트하는지 궁금합니다. – smnbbrv

+0

@smnbbrv 의심의 여지없이 정상적으로 작동하지만 RxJS4이며 RxJS5와 호환되지 않습니다. – cartant

+2

[소스] (https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L45-L83)를보고 직접 변환하는 방법을 확인하십시오. - 구현은 매우 작습니다. – cartant

답변

5

들어 마크의 추천에 따라 이것을 찾는 사람은 I adapted rx-node fromStream implementation for rxjs5입니다.

import { Observable } from 'rxjs'; 

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52 
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') { 
    stream.pause(); 

    return new Observable((observer) => { 
    function dataHandler(data) { 
     observer.next(data); 
    } 

    function errorHandler(err) { 
     observer.error(err); 
    } 

    function endHandler() { 
     observer.complete(); 
    } 

    stream.addListener(dataEventName, dataHandler); 
    stream.addListener('error', errorHandler); 
    stream.addListener(finishEventName, endHandler); 

    stream.resume(); 

    return() => { 
     stream.removeListener(dataEventName, dataHandler); 
     stream.removeListener('error', errorHandler); 
     stream.removeListener(finishEventName, endHandler); 
    }; 
    }).share(); 
} 
+0

작업중 인 작업에서이 작업을 수행 한 이후로 테스트를 해본 적은 없지만 다른 사람이 작업하고 있다면이 대답을 받아 들일 것입니다. – JuanCaicedo

+0

나는 그것을 사용하고 있으며 지금까지는 잘 작동하고있다. 나는 그것이 단위 테스트를하지 않았다고 생각했다. –

2

V4 및 V5 (면책 조항 검증되지 않은) 모두에 대해 다음과 같은 작업을해야합니다 :

fromStream: function (stream, finishEventName, dataEventName) { 
    stream.pause(); 

    finishEventName || (finishEventName = 'end'); 
    dataEventName || (dataEventName = 'data'); 

    return Observable.create(function (observer) { 

     // This is the "next" event 
     const data$ = Observable.fromEvent(stream, dataEventName); 

     // Map this into an error event 
     const error$ = Observable.fromEvent(stream, 'error') 
     .flatMap(err => Observable.throw(err)); 

     // Shut down the stream 
     const complete$ = Observable.fromEvent(stream, finishEventName); 

     // Put it all together and subscribe 
     const sub = data$ 
     .merge(error$) 
     .takeUntil(complete$) 
     .subscribe(observer); 

     // Start the underlying node stream 
     stream.resume(); 

     // Return a handle to destroy the stream 
     return sub; 
    }) 

    // Avoid recreating the stream on duplicate subscriptions 
    .share(); 
    }, 
관련 문제