2014-09-28 3 views
0

rxjava를 처음 사용하면서 다음과 같은 문제가 발생했습니다.Observable에 공급되는 값

외부 시스템에서 객체가 불규칙하게 FIFO 대기열에 누적되었습니다. 매초마다 실행되는 Observable이 필요하며 대기열에서 항목을 가져 와서 항목을 구독자에게 내 보냅니다.

두 가지 문제 : 큐 항목은 관찰이 살아있는 동안, 선행 모든 항목을 제공 할 수 없습니다 생산

  • . 대기열이 비어있을 수 있으며,이 경우 Observable은 대기해야하며 아무 것도 방출하지 않아야합니다. (Observable이 항목이 일시 중지 된 후에 대기열에서 사용 가능 해지면 즉시 시작하는 것이 좋겠지 만 더 자주 폴링하지 않으려면 대기열이 Observable 일 필요가 있습니다. 아이디어.)

  • 외부 시스템이 Observable을 완료 할 수 있어야합니다. 변수를 설정하고 Observable 내에서 변수를 읽을 수는 있지만 좀 더 우아한 방법이 있는지 알고 싶습니다. 그 아이템을 삭제하기 때문에

    LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue 
    boolean stopObservable = false; // the variable to stop the observable 
    
    Observable.create(new Observable.OnSubscribe<Layer>() { 
    
        @Override public void call(Subscriber<? super Layer> subscriber) { 
         try { 
          if (!queue.isEmpty()) { 
           Layer layer = queue.poll(); 
           subscriber.onNext(layer); 
          } else { 
           if (stopObservable) { subscriber.onCompleted(); } 
          } 
         } catch (Exception e) { 
          subscriber.onError(e); 
         } 
        } 
    
    }).somethingThatCreatesTheInterval().subscribeOnEtc. 
    
  • 주기로

, 난) (.sample 사용할 수 있고, 모든 항목이 방출되는 것이 중요하다.

.throttleWithTimeout()이 더보기는하지만 항목을 삭제하는 것으로 보입니다.

rx 매우 멋지지만 들어가기가 어렵습니다. 모든 의견을 감사드립니다.

+0

을 (물론이 고려) - 당신은 단지 "사람이 모든 (또는 단순히를 두 번째 건너 방출 할 입력 대기열에 사용 가능한 항목이없는 경우 "슬롯")? 내 첫 번째 본능은 타이머 ("펄스"를 제공)와 맵 (아무것도 맵핑하지 않지만 단순히 타이머로 방출되는 모든 Long을 버리고 대신 입력 큐에서 다음 항목을 방출 함)을 보거나 onCompleted stop 변수는 true로 설정됩니다). 하지만 어쩌면 거기에 더 우아한 대안이 ... –

+0

실제로 두 번째 단계에서 map 대신 flatMap을 사용해야 할 수도 있습니다 - 입력 대기열이 비어있는 경우를 처리 할 수 ​​있어야합니다. 따라서 Observable.just() 또는 Observable.empty() 중 하나를 내 보냅니다. –

답변

1

정기적 인 시간 간격으로 외부 웹 서비스를 폴링 할 때 비슷한 작업을 수행했습니다.

  1. 시간 간격은 timer입니다. 관찰 체인 폴링하고 해당 레이어가 null의 경우는, 어쩌면, 하나 개의 레이어를 선택합니다 1S의 단위로 각 틱에 따라 아무 것도 이제 전체에게 당신이 takeUntil을 추가 할 수 있습니다 체인을 중단 할 경우

    Observable.timer(0, 1, TimeUnit.SECOND) 
        .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null)) 
        .subscribe(layer -> System.out.format("The layer is : %s", layer)); 
    
  2. 을 방출하지 않습니다 . 그래서 외부 시스템이 이후의 가입이 중지됩니다 stopObservable에 뭔가 제출 중지 할 때 당신은 내가 일부 단어를 잘못 입력 할 수 있습니다 가정 할 수 있도록

    // somewhere before 
    PublishSubject stopNotifier = PublishSubject.create(); 
    
    // somewhere process the queue 
    Observable.timer(0, 1, TimeUnit.SECOND) 
        .takeUntil(stopNotifier) 
        .flatMap(tick -> Observable.just(queue.poll())) 
        .subscribe(layer -> System.out.format("The layer is : %s", layer)); 
    
    // when not anymore interested (calling onComplete works too) 
    stopNotifier.onNext("cancel everything about the queue"); 
    

나는 태블릿에서이 응답을 쓰고 있어요 또는)

0

가능한 경우 LinkedList<Layer> 대신 PublishSubject<Layer>을 사용해야합니다.그런 다음 외부 시스템은 publishSubject.onNext을 호출하여 새 항목을 제공 할 수 있으며 PublishSubjectObservable의 하위 클래스이므로 시스템에서 Observable로 처리 할 수 ​​있으며 원하는 타이밍과 관련된 의미에 따라 다음 연산자 중 하나를 적용하십시오. 그것은 :

  • sample
  • debounce
  • throttleFirst/throttleLast/throttleWithTimeout
  • .zipWith(Observable.timer(1, TimeUnit.SECONDS), (value, tick) -> value) (! 버퍼링을 많이 할 수 있음)
  • 전혀 타이밍 수정 입력 항목의 큐가 시간이 지남에 쌓이는 경우에 당신이 상관없는, 그래서
관련 문제