process.stdin
또는 fs.createReadStream
과 같은 노드 js 스트림이있는 경우 RxJs5를 사용하여이 스트림을 RxJs Observable 스트림으로 변환하려면 어떻게해야합니까?노드가 읽을 수있는 스트림을 RX 관찰 가능으로 변환하는 방법
나는 RxJs-Node에 fromReadableStream
메서드가 있지만 1 년 내에 업데이트되지 않은 것 같습니다.
process.stdin
또는 fs.createReadStream
과 같은 노드 js 스트림이있는 경우 RxJs5를 사용하여이 스트림을 RxJs Observable 스트림으로 변환하려면 어떻게해야합니까?노드가 읽을 수있는 스트림을 RX 관찰 가능으로 변환하는 방법
나는 RxJs-Node에 fromReadableStream
메서드가 있지만 1 년 내에 업데이트되지 않은 것 같습니다.
들어 마크의 추천에 따라 이것을 찾는 사람은 I adapted rx-node fromStream
implementation for rxjs5입니다.
import { Observable } from 'rxjs';
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return() => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).share();
}
작업중 인 작업에서이 작업을 수행 한 이후로 테스트를 해본 적은 없지만 다른 사람이 작업하고 있다면이 대답을 받아 들일 것입니다. – JuanCaicedo
나는 그것을 사용하고 있으며 지금까지는 잘 작동하고있다. 나는 그것이 단위 테스트를하지 않았다고 생각했다. –
RxJs 노드 구현 RxJs4을 기반으로하지만, 많은 작업없이 RxJs5에 이식 할 수 https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
감사합니다. 그렇다면 전환을 구현해야하는 것처럼 들리십니까? – JuanCaicedo
rxjs5 버전이 아직없는 것 같습니다. 예 –
V4 및 V5 (면책 조항 검증되지 않은) 모두에 대해 다음과 같은 작업을해야합니다 :
fromStream: function (stream, finishEventName, dataEventName) {
stream.pause();
finishEventName || (finishEventName = 'end');
dataEventName || (dataEventName = 'data');
return Observable.create(function (observer) {
// This is the "next" event
const data$ = Observable.fromEvent(stream, dataEventName);
// Map this into an error event
const error$ = Observable.fromEvent(stream, 'error')
.flatMap(err => Observable.throw(err));
// Shut down the stream
const complete$ = Observable.fromEvent(stream, finishEventName);
// Put it all together and subscribe
const sub = data$
.merge(error$)
.takeUntil(complete$)
.subscribe(observer);
// Start the underlying node stream
stream.resume();
// Return a handle to destroy the stream
return sub;
})
// Avoid recreating the stream on duplicate subscriptions
.share();
},
그래도 작동합니까? 누가 작동하는지 얼마나 자주 업데이트하는지 궁금합니다. – smnbbrv
@smnbbrv 의심의 여지없이 정상적으로 작동하지만 RxJS4이며 RxJS5와 호환되지 않습니다. – cartant
[소스] (https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L45-L83)를보고 직접 변환하는 방법을 확인하십시오. - 구현은 매우 작습니다. – cartant