2016-08-25 2 views
0

난수 스트림이 있습니다.두 개의 이벤트 흐름을 동시에 나누어 처리하는 방법은 무엇입니까?

rx.Observable 
.range (0, 1000) 
.map (() -> 200d * Math.random()) 

나는 흐름이 2 개로 분할되어야합니다. 숫자가 100보다 작고 숫자가 100보다 많습니다.

숫자가 100보다 작은 경우 (chain1) : 네트워크에 request1을 수행하고 답변을 기다린 후 다른 연산자의 프로세스 체인 1을 계속 수행해야합니다. .

숫자가 100을 초과하는 경우 (chain2) : 대답 request2을 보내고 응답을 기다린 후 프로세스 체인 운영자를 계속 진행하십시오.

request1request2은 서로를 기다리지 않고 체인이 병렬로 수행됩니다. 그러나 체인 처리 내에서 요청 응답을 기다려야합니다.

어떻게합니까?

답변

0
rx.Observable 
         .create(subscriber -> { 
          for (int i = 0; i < 100; i++) { 
           subscriber.onNext(i); 
           Log.i("Iniop", "Create thread name: " + Thread.currentThread().getName()); 
          } 
          subscriber.onCompleted(); 
         }) 
         .onBackpressureBuffer() 
         .observeOn(Schedulers.computation()) 
         .subscribeOn(Schedulers.computation()) 
         .map(v -> { 
          Log.i("Iniop", "Map thread name: " + Thread.currentThread().getName()); 
          return 200d * Math.random(); 
         }) 
         .groupBy(k -> { 
            Log.i("Iniop", "Group thread name: " + Thread.currentThread().getName()); 
            return k > 100 ? "yes" : "no"; 
           } 
           , v -> v) 
         .forEach(gO -> gO.observeOn(Schedulers.newThread()) 
             .map(v -> new Pair<String, Double>(gO.getKey(), v)) 
             .subscribe(v -> { 
                Log.i("Iniop", "Key: " + v.first); 
                Log.i("Iniop", "Value: " + v.second); 
                Log.i("Iniop", "Thread name: " + Thread.currentThread().getName()); 
               } 
               , e -> Log.e("Iniop", "Err", e)) 
           , e -> Log.e("Iniop", "Err", e)); 
+0

'map' 및'groupBy' 연산자가 계산 스케줄러에서 실행되도록하기 전에'subscribeOn'을 이동시킬 수 있습니다. – JohnWowUs

관련 문제