2017-04-10 2 views
2

내 각형 어플리케이션은 websocket을 사용하여 백엔드와 통신합니다.WebSocket에서 관찰 할 수있는 RxJ

제 테스트 케이스에는 2 개의 클라이언트 구성 요소가 있습니다. Observable 타이머는 두 개의 다른 클라이언트 ID를 예상대로 인쇄합니다.

각 ngOnInit()은 해당 클라이언트의 ID도 인쇄합니다.

지금 어떤 이유로 든 websocketService.observeClient()의 가입이 각 메시지에 대해 2 회 호출되지만 this.client.id은 항상 두 번째 클라이언트의 값을 인쇄합니다.

Heres는 내 클라이언트 구성 요소

@Component({ 
... 
}) 
export class ClientComponent implements OnInit { 

    @Input() client: Client; 

    constructor(public websocketService: WebsocketService) { 
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id)); 
    } 

    ngOnInit() { 

    console.log(this.client.id); 
    this.websocketService.observeClient().subscribe(data => { 
     console.log('message', this.client.id); 
    }); 

    } 

} 

그리고 내 웹 소켓 서비스

@Injectable() 
export class WebsocketService { 

    private observable: Observable<MessageEvent>; 
    private observer: Subject<Message>; 

    constructor() { 

    const socket = new WebSocket('ws://localhost:9091'); 

    this.observable = Observable.create(
     (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
     } 
    ); 

    this.observer = Subject.create({ 
     next: (data: Message) => { 
     if (socket.readyState === WebSocket.OPEN) { 
      socket.send(JSON.stringify(data)); 
     } 
     } 
    }); 

    } 

    observeClient(): Observable<MessageEvent> { 
    return this.observable; 
    } 

} 

편집

좋아, 나는 그것이 그 사실과 관련이있다 읽고 멀리로 Observables은 유니 캐스트 개체이며이를 위해 제목을 사용해야하지만 제목을 만드는 방법을 알지 못합니다.

+0

당신에게 문제가 당신의'providers' 구성하지 않을 확실 해요? 귀하의 설명을 보면, 각 클라이언트 구성 요소가 'WebsocketService' 인스턴스를 소유하고 싶어하는 것처럼 보입니다. – martin

+0

백엔드에 연결된 하나의 주입 된 websocketService가 없어야합니다. – Pascal

답변

1

share 연산자를 사용하여 구독자간에 공유해야합니다.

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
    } 
).share(); 

또한이 서비스가 싱글 톤인지 확인하십시오.

문서 : 당신이 사용할 수있는 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

0

rxjs 5로 내장 당신을위한 주제를 만들어 웹 소켓 기능. 또한 오류가 발생한 후 스트림에 다시 가입하면 다시 연결됩니다. 이 답변을 참조하십시오 :

https://stackoverflow.com/a/44067972/552203

TLDR :

let subject = Observable.webSocket('ws://localhost:8081'); 
subject 
    .retry() 
    .subscribe(
     (msg) => console.log('message received: ' + msg), 
     (err) => console.log(err), 
    () => console.log('complete') 
    ); 
subject.next(JSON.stringify({ op: 'hello' })); 
관련 문제