나는 인터리브 된 스트림이 I split into separate sequential streams입니다.Reactive Extensions를 사용하여 스트림을 동시에 처리 할 때 버퍼링을 제한하는 방법
생산국 outputs[]
이 스트림은 아래에서 정의되는 주제이다 처리
int streamCount = 3;
new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
. .ObserveOn()
은 스트림을 동시에 처리하는 데 사용됩니다 (멀티 스레드).
소비자
var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();
outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));
이 코드의 문제는 출력 스트림을 따라 잡을 수없는 경우에도, 가능한 한 빨리 전체 열거를 읽을 것입니다. 필자의 경우 열거 형은 파일 스트림이므로 많은 메모리를 사용하게됩니다. 따라서 버퍼가 약간의 임계 값에 도달하면 읽기를 차단하고 싶습니다.