2017-02-07 2 views
1

RxJava를 통해 요청에 대한 응답을 듣고 싶을 때가 있습니다. 문제는 Observable을 설정하여 이벤트를 수신하고 올바른 순서로 구독시 메시지를 보내는 방법을 잘 모르는 것입니다. 스레드가 일시 중지되거나 응답이 빠르면 빠뜨릴 수 있기 때문에 메시지를 보내고 싶지 않습니다. 이것은 내가에 생각할 수있는 가장 가까운 내 자신의RxJava로 구독 신청하기

connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      ... // do stuff 

또는

Observable.defer(() -> { 
    connection.send(message); 
    return connection.onReceivedMessage(); 
})... // do stuff 

하지만 여전히 메시지를 보낼 수 있고 응답을 수신 할 수없는 것처럼이 여전히 보인다. 다른 사람이 이것을 시도 했습니까? 나는 정말로 일종의 afterCreate()를 원한다고 느낀다.

답변

1

스레드가 일시 중단되거나 응답 속도가 빠르면 나는 그것을 놓칠 수 있기 때문에 메시지를 보내지 않으려 고합니다.

Subject을 사용하십시오. BehaviorSubject (항상 새 관찰자를 새 구독자에게 내 보냅니다) 또는 ReplySubject (새 구독자에게 방출 된 모든 Observable을 내 보냅니다) 중 하나입니다. 나는 전체 논리에 대해 잘 모르겠지만, 당신은 같은 것을 할 수 :

public BehaviorSubject mMessageBehaviorSubject = BehaviorSubject.create(); 


private void sendMessage() { 
    connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      .subscribe(mSubject::onNext, Throwable::printStackTrace); 
} 


public Observable<String> getMessageObservable() { 
    return mMessageBehaviorSubject.asObservable(); 
} 

이 경우에, 당신은 유 얻을 것이다들을 준비 메시지 때마다를 보낼 수 있습니다 이런 식으로, 최신 메시지가

를 전송
0

doOnSubscribe가 필요한 것입니다. 찾고있는 "afterCreate"입니다. 첫 번째 예에서 첫 번째 메시지는 구독 후 보내집니다.이 시점에서 Observable은 이미 응답을 처리 할 준비가되었습니다.

질문 사항 - 다른 사람이 시도 했습니까? - 응답은 예입니다. 첫 번째 예제와 동일한 기술을 사용하여 코드 일부를 rxify합니다.

+0

doOnSubscribe는 (는) 어떻게 작동합니까? 나는 이것을 더 시험해야 할 것이다. 나는 내가 단위 테스트를하지 않았 음을 인정할 것이다. – Buttink

관련 문제