rxjava를 처음 사용하면서 다음과 같은 문제가 발생했습니다.Observable에 공급되는 값
외부 시스템에서 객체가 불규칙하게 FIFO 대기열에 누적되었습니다. 매초마다 실행되는 Observable이 필요하며 대기열에서 항목을 가져 와서 항목을 구독자에게 내 보냅니다.
두 가지 문제 : 큐 항목은 관찰이 살아있는 동안, 선행 모든 항목을 제공 할 수 없습니다 생산
. 대기열이 비어있을 수 있으며,이 경우 Observable은 대기해야하며 아무 것도 방출하지 않아야합니다. (Observable이 항목이 일시 중지 된 후에 대기열에서 사용 가능 해지면 즉시 시작하는 것이 좋겠지 만 더 자주 폴링하지 않으려면 대기열이 Observable 일 필요가 있습니다. 아이디어.)
외부 시스템이 Observable을 완료 할 수 있어야합니다. 변수를 설정하고 Observable 내에서 변수를 읽을 수는 있지만 좀 더 우아한 방법이 있는지 알고 싶습니다. 그 아이템을 삭제하기 때문에
LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue boolean stopObservable = false; // the variable to stop the observable Observable.create(new Observable.OnSubscribe<Layer>() { @Override public void call(Subscriber<? super Layer> subscriber) { try { if (!queue.isEmpty()) { Layer layer = queue.poll(); subscriber.onNext(layer); } else { if (stopObservable) { subscriber.onCompleted(); } } } catch (Exception e) { subscriber.onError(e); } } }).somethingThatCreatesTheInterval().subscribeOnEtc.
주기로
.throttleWithTimeout()이 더보기는하지만 항목을 삭제하는 것으로 보입니다.
rx 매우 멋지지만 들어가기가 어렵습니다. 모든 의견을 감사드립니다.
을 (물론이 고려) - 당신은 단지 "사람이 모든 (또는 단순히를 두 번째 건너 방출 할 입력 대기열에 사용 가능한 항목이없는 경우 "슬롯")? 내 첫 번째 본능은 타이머 ("펄스"를 제공)와 맵 (아무것도 맵핑하지 않지만 단순히 타이머로 방출되는 모든 Long을 버리고 대신 입력 큐에서 다음 항목을 방출 함)을 보거나 onCompleted stop 변수는 true로 설정됩니다). 하지만 어쩌면 거기에 더 우아한 대안이 ... –
실제로 두 번째 단계에서 map 대신 flatMap을 사용해야 할 수도 있습니다 - 입력 대기열이 비어있는 경우를 처리 할 수 있어야합니다. 따라서 Observable.just() 또는 Observable.empty() 중 하나를 내 보냅니다. –