2017-01-17 14 views
0

들어오는 처리 요청이 있습니다. 공유 자원이 고갈되어 동시 처리가 너무 많아지기를 원하지 않습니다. 나는 또한 몇 가지 독특한 키를 공유 요청이 동시에 실행되지 선호하는 것 : 키 당 관찰이 결코 완료하지 않기 때문에RxJava/RxScala에서 groupBy와 flatMap (maxConcurrent, ...)을 결합합니다.

def process(request: Request): Observable[Answer] = ??? 

requestsStream 
    .groupBy(request => request.key) 
    .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     requestsForKey 
     .flatMap(1, process) 
    }) 

그러나, 위에서 작동하지 않습니다. 이것을 달성하기위한 올바른 방법은 무엇입니까? 작동하지 않습니다 무엇

는 :

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently 
     requestsForKey.take(1) 
     .flatMap(1, process) 
    }) 

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing 
     // This discards all requests after the first if processing takes more than 100 milliseconds 
     requestsForKey.timeout(100.millis, Observable.empty) 
     .flatMap(1, process) 
    }) 

답변

1

여기에 내가 이것을 달성하기 위해 관리하는 방법입니다. (같은 키를 사용하여 메시지가 순서대로 처리되도록) 각 고유 키를 들어 내가 전용 단일 스레드 스케줄러를 할당하고 내 경우

@Test 
public void groupBy() throws InterruptedException { 
    final int NUM_GROUPS = 10; 
    Observable.interval(1, TimeUnit.MILLISECONDS) 
      .map(v -> { 
       logger.info("received {}", v); 
       return v; 
      }) 
      .groupBy(v -> v % NUM_GROUPS) 
      .flatMap(grouped -> { 
       long key = grouped.getKey(); 
       logger.info("selecting scheduler for key {}", key); 
       return grouped 
         .observeOn(assignScheduler(key)) 
         .map(v -> { 
          String threadName = Thread.currentThread().getName(); 
          Assert.assertEquals("proc-" + key, threadName); 
          logger.info("processing {} on {}", v, threadName); 
          return v; 
         }) 
         .observeOn(Schedulers.single()); // re-schedule 
      }) 
      .subscribe(v -> logger.info("got {}", v)); 

    Thread.sleep(1000); 
} 

키 (NUM_GROUPS)의 수하는 작은 그래서위한 전용 스케줄러를 만들 각 키 :

키의 수는 각각에 대해 스레드를 바치고 무한거나 너무 큰 경우
Scheduler assignScheduler(long key) { 
    return Schedulers.from(Executors.newSingleThreadExecutor(
     r -> new Thread(r, "proc-" + key))); 
} 

, 당신이 스케줄러의 풀을 생성하고 다음과 같이 다시 사용할 수 있습니다

Scheduler assignScheduler(long key) { 
    // assign randomly 
    return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)]; 
} 
+0

N 필자의 경우에는 요청 스트림이 오래되고 오래 지속되며 많은 다른 키가 포함되어 있기 때문에 작동하지 않습니다. 또한 추가 스레드를 생성하지 않고 동일한 스레드 풀을 사용하는 것을 선호합니다. – dtech

+0

@dtech 아이디어는 특정 스레드에 작업을 예약 할 수 없지만 특정 스케줄러에 예약 할 수 있다는 것입니다. 단일 쓰레드 것들. 내 대답에 많은 수의 키에 대한 솔루션을 추가했습니다. –