당신이 원하는 : 후드에서
Observable<String> myObservable = Observable.just("firstValue", "secondValue");
String result = myObservable.toBlocking().first();
System.out.println(result); // ---> "firstValue"
, 당신을위한 가입 BlockingObservable.first()
를 않습니다 전화 :
private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<T> returnItem = new AtomicReference<T>();
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(final Throwable e) {
returnException.set(e);
latch.countDown();
}
@Override
public void onNext(final T item) {
returnItem.set(item);
}
});
BlockingUtils.awaitForComplete(latch, subscription);
if (returnException.get() != null) {
Exceptions.propagate(returnException.get());
}
return returnItem.get();
}
UPDATE : 어떤 이해가되지 않는 경우 사용 BehaviourSubject
+ toBlocking()
. 그것이 Observable
및 Observer
이므로 어딘가에서 myObservable.onNext("value")
이 호출되어야한다는 것을 고려해야합니다. toBlocking()
을 호출하여 스레드를 차단하면 onNext()
이 호출 된 다른 스레드에서 myObservable
을 사용할 수 없다면 차단 될 수 있습니다.
예를 들어, 이것은`BehaviourSubject의 정상적인 사용하는 것입니다 :
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
흥미로운, 나는 이것이 BehaviorSubject 실제로 .. 변화를 결정 하는가하는 것을 잊지? 그것은 나를 위해 모든 스레드를 무기한 차단합니다. – breakline