드류가 맞습니다. 실제로 ConcurrentQueue는 실제로 BlockingCollection이 사용하는 기본 데이터 구조라고 생각합니다. 아주 내 앞에 너무 보인다. 이 책의 7 장 * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 을 확인하고 BlockingCollection을 사용하는 방법과 여러 생성자와 여러 소비자가 각각 "대기열"에서 벗어나는 방법을 설명합니다. "GetConsumingEnumerable()"메소드를 살펴보고, 아마도 .ToObservable()을 호출하기를 원할 것입니다.
* 나머지는 꽤 평균입니다.
편집 : 여기
난 당신이 원하는 것을 생각 샘플 프로그램입니까?
class Program
{
private static ManualResetEvent _mre = new ManualResetEvent(false);
static void Main(string[] args)
{
var theQueue = new BlockingCollection<string>();
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));
LoadQueue(theQueue, "Producer A");
LoadQueue(theQueue, "Producer B");
LoadQueue(theQueue, "Producer C");
_mre.Set();
Console.WriteLine("Processing now....");
Console.ReadLine();
}
private static void ProcessNewValue(string value, string consumerName, int delay)
{
Thread.SpinWait(delay);
Console.WriteLine("{1} consuming {0}", value, consumerName);
}
private static void LoadQueue(BlockingCollection<string> target, string prefix)
{
var thread = new Thread(() =>
{
_mre.WaitOne();
for (int i = 0; i < 100; i++)
{
target.Add(string.Format("{0} {1}", prefix, i));
}
});
thread.Start();
}
}
여기에서 Rx가 당신을 도울 것으로 기대되는 방법에 대해 자세히 설명해 줄 수 있습니까? –
@ 리차드 스살 레이 (Richard Szalay) - 마지막에 언급했듯이, 대기열에 무엇이 있는지보기 위해 설문 조사를 할 필요가 없다는 것이 내 생각입니다. 무언가가 거기에 놓여질 때 반응 할 수 있습니다. 따라서 많은 수의 항목 갑자기 여러 스레드에서 처리를 할 수 있습니다. 나는 지금 당장하고있는 투표를 피하려고 노력하고있다. –