2012-06-23 6 views
4

나는 인터리브 된 스트림이 I split into separate sequential streams입니다.Reactive Extensions를 사용하여 스트림을 동시에 처리 할 때 버퍼링을 제한하는 방법

생산국 outputs[]이 스트림은 아래에서 정의되는 주제이다 처리

int streamCount = 3; 

new MyIEnumerable<ElementType>() 
.ToObservable(Scheduler.ThreadPool) 
.Select((x,i) => new { Key = (i % streamCount), Value = x }) 
.Subscribe(x => outputs[x.Key].OnNext(x.Value)); 

. .ObserveOn()은 스트림을 동시에 처리하는 데 사용됩니다 (멀티 스레드).

소비자

var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();                          

outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x)); 
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x)); 
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x)); 

이 코드의 문제는 출력 스트림을 따라 잡을 수없는 경우에도, 가능한 한 빨리 전체 열거를 읽을 것입니다. 필자의 경우 열거 형은 파일 스트림이므로 많은 메모리를 사용하게됩니다. 따라서 버퍼가 약간의 임계 값에 도달하면 읽기를 차단하고 싶습니다.

답변

3

나는 아래와 같이 생산자와 소비자에 대한 세마포어를 사용하여이 문제를 해결했습니다. 그러나, 이것이 Rx 계약, 프로그래밍 스타일 등의 측면에서 좋은 해결책으로 여겨지는 것은 아닙니다.

var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS); 

// add to producer (before subscribe) 
.Do(_ => semaphore.Wait()); 

// add to consumer (before subscribe) 
.Do(_ => semaphore.Release())) 

그것은 좋은 생각 Wait()에 호출에 CancelationToken을 전달하고 스트림이 비정상적으로 정지 할 때이 취소되어 있는지 확인 할 수 있습니다?

2

나는 귀하의 솔루션이 매우 합리적이라고 생각합니다. 가장 큰 문제점 (이전 질문에 대한 배경 지식)은 현재 솔루션의 '내부'가 현재 어디에나 노출되어 있다는 것입니다. 그냥 당신이 제대로이를 코딩 할 때 다음과 같은 정리 있는지 확인하십시오

  • 랩의 모든 것을 하나의 방법을 노출하는 클래스로 : IDisposable Subscribe(<index>, Action) 또는 대안 IObservable<element> ToObservable(<index>))을. 반환 된 구독 또는 반환 된 관찰 가능 중 하나가 이미 수행 된 모든 '작업', 즉 추가 된 Do 작업 등을 갖습니다. 사전 또는 목록이 모두 있다는 사실은 사용자와 완전히 관련이 없어야합니다. 그렇지 않으면 여기에 코드를 변경하면 모든 곳에서 변경해야합니다.

  • CancelationToken 좋은 생각, 당신은 Do에 과부하를 사용하여 수행 할 수 OnCompleted 또는 OnError, 하나에 취소 할 수 있는지 확인합니다.

관련 문제