2017-12-25 1 views
2

나는이 시나리오를 가지고 있습니다 : 꽤 빨리 항목을 생성하는 스트림 UI 스레드에서 많은 작업을하는 구독자입니다.RxJava - 다른 스트림을 기반으로 구독 취소 (다시)

특정 조건이 충족되면 추가 작업을 저장하지 않으려 고합니다.

다시 구독하는 경우 추가 작업을하고 싶습니다.

예 :네트워크 이벤트는 항목에 대한 업데이트를 생성하고 모델에서이를 수신하고 항목을 업데이트하며 업데이트 된 항목으로 새로운 관찰 가능 항목을 노출합니다. 화면이 모델의 스트림을 구독하고 그에 따라 GUI를 업데이트합니다. 새 화면이 위에 열리므로 이전 화면에서 더 이상 GUI를 업데이트 할 필요가 없습니다. - 모델을 계속 업데이트하지 않는 동안 (모델이 계속 업데이트되는 동안) 새 화면이 닫히면 이전 화면이 다시 볼 수있게됩니다. 다시 구독해야하며 시간을 들으므로 감상을 새로 고침해야합니다.

나는 몇 가지 솔루션을 가지고 있지만 아마 더 나은 하나가 기분이 : 나는 상태를 확인하기 위해 필터를 사용할 수 있습니다

. 그리고 그 항목들을 그냥 필터링하는 것보다 더 효율적일 것이므로 전체 구독을 선호합니다.

getItemsUpdateObs().filter(o -> isScreenVisible()).subscribe(...); 

구독하고 상태 이벤트를 수신 대기하는 데 사용할 수 있습니다. 그때마다 새로운 구독을 통해 해당 이벤트에 따라 구독/구독 취소하십시오. 이것은 기능적인 것이 아닌 절차 적 솔루션입니다. 나는 창 연산자를 사용하여 그것을 평평하게 만들 것을 생각했지만, 이것은 솔직한 해결책이라고 확신하지 못합니다.

isScreenVisibleObs().subscribe(isVisible -> { 
if (isVisible){ 
    subscription = getItemsUpdateObs().subscribe(...) 
} else { 
    if (subscription != null) subscription.unsubscribe() 
}}); 

의견이 있으십니까?

나는이처럼하고 싶으면 :

+0

지금까지 답변을 얻은 것 같아서 코드 예제를 추가했습니다. – ndori

답변

1

저장 Observable.subscribe의 반환 유형() 메소드 listenWhen 여기에 질문

getItemsUpdateObs().compose(listenWhen(isScreenVisibleObs()).subscribe(...) 

을 .... 그냥 (/ 실행 완료) 이전의 통화 상태 확인 다음 요청을 제공 할 때마다 다음과 같이 이전 화면의 onActivityResult를에 을 요청 다시

Subscription subscription = Observable.subscribe(new Subscriber<Type>() { 
     @Override 
     public void onCompleted() { 
     } 

     @Override 
     public void onError(Throwable e) { 
     } 

     @Override 
     public void onNext(String responseString) { 
     } 
    }); 

를 다시 시작합니다. 완료되지 않은 경우 이전 호출을 취소하고 다음과 같이 새 요청을 제공하십시오.

if(subscription!=null && !subscription.isUnsubscribed()){ 
     //Cancel(unSubscribe) the request if running(not completed) 
     subscription.unsubscribe(); 
     //START A NEW REQUEST HERE 
}else{ 
     //already completed so START A NEW REQUEST 
} 
0

이전 구독이 끝났다고해도 걱정하지 않아도됩니다. 새 요청에 대한 새 구독자를 만들려는 다음 창에서 구독을 구독 취소하면 모든 구독자가 자동으로 구독을 취소합니다.

그런 다음 새 구독자를 만들고 구독에 다시 구독하십시오.

/** 
* You can in any moment unsubscribe all subscriber of a subscription and create a new one again. 
* @throws InterruptedException 
*/ 
@Test 
public void subscribeAndUnsubscribe() throws InterruptedException { 
    Integer[] numbers = {0, 1, 2, 4, 5, 6}; 
    Observable<Integer> observable = Observable.from(numbers); 
    Subscription subscription = observable.subscribeOn(Schedulers.newThread()).subscribe(createSubscriber()); 
    Thread.sleep(2000); 
    subscription.unsubscribe(); 
    subscription = observable.subscribeOn(Schedulers.newThread()).subscribe(createSubscriber()); 
    Thread.sleep(10000); 
    System.out.println(subscription.isUnsubscribed()); 
} 

private ActionSubscriber createSubscriber() { 
    return new ActionSubscriber(number -> { 
     try { 
      Thread.sleep(500); 
     } catch (InterruptedException e) { 
      System.out.println("preparing to unsubscribe"); 
     } 
     System.out.println("Subscriber number:" + number); 
    }, 
      System.out::println, 
      () -> System.out.println("Subscriber End of pipeline")); 
} 

당신은 더 여기 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java

1

switchMap()라고 RxJava 내에서 매우 유용한 연산자가있다 볼 수 있습니다. 그것은 관찰 가능을 기반으로 구독을 전환합니다.

먼저 설정하여 관찰 할 수있는 화면이 활동중인 여부에 따라 : 기반으로

다음
Observable<Boolean> isActive; 

, 스위치가 관찰 :

isActive 
    .switchMap(active -> active ? getItemsUpdateObs() : Observable.never()) 
    .subscribe(...); 

때 화면이있는 항목 만 업데이트에 가입한다 활성. 화면 당 하나의 관찰 가능해야하지만, 그들은 저렴합니다. 이렇게하면 관찰 할 수있는 체인이 자체 상태를 유지하고 구독 및 다시 구독의 올바른 관리 및 타이밍이 처리됩니다.