2016-06-27 2 views
0

스트림의 현재 값으로 뷰를 업데이트해야하는 MVC 응용 프로그램이 있습니다. 내가 컨트롤러에서 사용RxJava 구독이 정확하게 구독 취소되지 않습니다.

public Observable<Integer> getStreamInstance(){ 
    if(stream == null){ 
     this.stream = Observable.create((Subscriber<? super Integer> subscriber) -> { 
      new HeartbeatStream(frequence,subscriber).start(); 
     }); 
    } 

    return stream; 
} 

스트림을 얻을 : 모델에서 나는이 방법을 가지고있다. 그런 다음 컨트롤러에 다음 두 가지 방법이 있습니다.

public void start(){ 
    this.sb = stream.subscribe((Integer v) -> { 
     view.updateCurrValue(v); 
    }); 
} 

public void stop(){ 
    this.sb.unsubscribe();  
} 

시작 방법을 사용하면보기의 레이블을 현재 값으로 간단히 업데이트 할 수 있습니다. 구독을 취소하여 업데이트를 중지 할 때까지 제대로 작동합니다. 실제로보기에서 버튼 "정지"를 누르면 레이블이 현재 값으로 계속 업데이트되고 "시작"을 다시 누르면 두 개의 다른 스트림에서 레이블이 표시됩니다. 처음에는 "시작"을하고 두 번째는 "시작"을 두 번째로 눌러 생성 된 것처럼 보입니다. 어디서 잘못 되었나요?

편집 :

public class HeartbeatStream extends Thread{ 

private Subscriber<? super Integer> subscriber; 
private int frequence; 
private HeartbeatSensor sensor; 

public HeartbeatStream(int freq, Subscriber<? super Integer> subscriber){ 
    this.frequence = freq; 
    this.subscriber = subscriber; 
    sensor = new HeartbeatSensor(); 
} 

public void run(){ 

    while(true){   
     try { 
      subscriber.onNext(sensor.getCurrentValue()); 
      Thread.sleep(frequence); 
     } catch (Exception e) { 
      subscriber.onError(e); 
     } 
    } 


} 

이것은 HeartbeatStream 클래스입니다. HeartbeatSensor는 하트 비트 빈도를 시뮬레이트하는 값을 주기적으로 생성하는 클래스입니다.

답변

0

귀하의 HeartbeatStream 클래스에서 어떤 일이 벌어지는 지 알 수는 없지만 unsubscribe은 처리하지 않는 것으로 보입니다.

Observable.create으로 만들면 subscriber.isUnsubscribed()으로 명시 적으로 수신 거부를 처리해야합니다.

Observable을 만드는 데 유틸리티 메서드를 사용할 수 있습니다.이 메서드는 모두 Observable.just() 또는 Observable.from()과 같이이 모든 것을 처리합니다.

그래도 도움이되지 않는다면 HeartbeatStream 클래스를 게시하십시오.

은 자세한 내용은에게 문서를 참조하십시오 : https://github.com/ReactiveX/RxJava/wiki/Creating-Observables https://github.com/ReactiveX/RxJava/wiki/Async-Operators

+0

heartbeatStream 클래스를 추가했습니다. – User999

2

난 당신이 정기적으로 화면 갱신을 트리거 몇 가지 이벤트를 알리기 위해 노력 같은데요. 디자인에 의해

Observable<Long> timer = Observable.interval(period, TimeUnit.MILLISECONDS, 
    AndroidSchedulers.mainThread()); 

SerialSubscription serial = new SerialSubscription(); 

public void start() { 
    serial.set(timer.subscribe(v -> view.updateCurrValue(v))); 
} 

public void stop() { 
    serial.set(Subscriptions.unsubscribed()); 
} 

public void onDestroy() { 
    serial.unsubscribe(); 
} 
관련 문제