0
들어오는 처리 요청이 있습니다. 공유 자원이 고갈되어 동시 처리가 너무 많아지기를 원하지 않습니다. 나는 또한 몇 가지 독특한 키를 공유 요청이 동시에 실행되지 선호하는 것 : 키 당 관찰이 결코 완료하지 않기 때문에RxJava/RxScala에서 groupBy와 flatMap (maxConcurrent, ...)을 결합합니다.
def process(request: Request): Observable[Answer] = ???
requestsStream
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
requestsForKey
.flatMap(1, process)
})
그러나, 위에서 작동하지 않습니다. 이것을 달성하기위한 올바른 방법은 무엇입니까? 작동하지 않습니다 무엇
는 :
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
requestsForKey.take(1)
.flatMap(1, process)
})
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
})
N 필자의 경우에는 요청 스트림이 오래되고 오래 지속되며 많은 다른 키가 포함되어 있기 때문에 작동하지 않습니다. 또한 추가 스레드를 생성하지 않고 동일한 스레드 풀을 사용하는 것을 선호합니다. – dtech
@dtech 아이디어는 특정 스레드에 작업을 예약 할 수 없지만 특정 스케줄러에 예약 할 수 있다는 것입니다. 단일 쓰레드 것들. 내 대답에 많은 수의 키에 대한 솔루션을 추가했습니다. –