2016-09-16 1 views
2

차단에 가입 :RxJava 내가 다음 달성하고 싶습니다 관찰

String result = myObservable.toBlocking().first(); 

즉 그것은 일반 함수 호출과 같다. 그러나 당신이 그것을 구독해야하기 때문에 이것은 결코 일어나지 않습니다. 나는 그것을하는 법을 모릅니다. 만약 내가 그것을 구독한다면, 그 결과는 다른 범위에있게 될 것이고, 코드는 어쨌든 규칙적인 관찰 결과와 같은 결과만을 얻을 수 있기 때문에 매우 못생긴다. 그래서 그것을 블로킹 관찰로 바꿀 필요가 없다. 그것은 실제로 작동

답변

1

당신이 원하는 : 후드에서

Observable<String> myObservable = Observable.just("firstValue", "secondValue"); 
    String result = myObservable.toBlocking().first(); 
    System.out.println(result); // ---> "firstValue" 

, 당신을위한 가입 BlockingObservable.first()를 않습니다 전화 :

private T blockForSingle(final Observable<? extends T> observable) { 
    final AtomicReference<T> returnItem = new AtomicReference<T>(); 
    final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>(); 
    final CountDownLatch latch = new CountDownLatch(1); 

    @SuppressWarnings("unchecked") 
    Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() { 
     @Override 
     public void onCompleted() { 
      latch.countDown(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
      returnException.set(e); 
      latch.countDown(); 
     } 

     @Override 
     public void onNext(final T item) { 
      returnItem.set(item); 
     } 
    }); 
    BlockingUtils.awaitForComplete(latch, subscription); 

    if (returnException.get() != null) { 
     Exceptions.propagate(returnException.get()); 
    } 

    return returnItem.get(); 
} 

UPDATE : 어떤 이해가되지 않는 경우 사용 BehaviourSubject + toBlocking(). 그것이 ObservableObserver이므로 어딘가에서 myObservable.onNext("value")이 호출되어야한다는 것을 고려해야합니다. toBlocking()을 호출하여 스레드를 차단하면 onNext()이 호출 된 다른 스레드에서 myObservable을 사용할 수 없다면 차단 될 수 있습니다.

예를 들어, 이것은`BehaviourSubject의 정상적인 사용하는 것입니다 :

// observer will receive the "one", "two" and "three" events, but not "zero" 
    BehaviorSubject<Object> subject = BehaviorSubject.create("default"); 
    subject.onNext("zero"); 
    subject.onNext("one"); 
    subject.subscribe(observer); 
    subject.onNext("two"); 
    subject.onNext("three"); 
+0

흥미로운, 나는 이것이 BehaviorSubject 실제로 .. 변화를 결정 하는가하는 것을 잊지? 그것은 나를 위해 모든 스레드를 무기한 차단합니다. – breakline

관련 문제