2010-06-13 2 views
5

여러 스레드가 항목을 넣을 수있는 동안 여러 스레드를 사용하여 동시 대기열에서 항목을 처리하려고 할 때 이상적인 해결책은 Reactive Extensions 동시 데이터 구조 항목이 투입 될 때 지속적으로 dequeueing됩니다 LINQ (또는 PLINQ) 쿼리를 가지고 어떤 방법이 있는지 IObservable/IObserver를 ConcurrentQueue 또는 ConcurrentStack과 함께 사용하는 방법

While using ConcurrentQueue, trying to dequeue while looping through in parallel

그래서 내가 궁금 :

내 원래의 질문이다.

나는 이것을 n 개의 프로듀서가 큐에 넣고 제한된 수의 스레드를 처리하도록 할 수있는 방식으로 작업하려고하므로 데이터베이스에 과부하가 걸리지 않습니다.

만약 내가 Rx 프레임 워크를 사용할 수 있다면, 나는 단지 그것을 시작할 수있을 것이라고 기대한다. 그리고 100 개의 아이템이 100ms 내에 있다면, PLINQ 쿼리의 일부인 20 개의 스레드는 큐를 통해 처리 할 뿐이다.

내가 함께 일하려고 세 가지 기술이 있습니다

  1. 수신 프레임 워크 (반응성 LINQ)
  2. PLING
  3. System.Collections.Concurrent 구조
+0

여기에서 Rx가 당신을 도울 것으로 기대되는 방법에 대해 자세히 설명해 줄 수 있습니까? –

+0

@ 리차드 스살 레이 (Richard Szalay) - 마지막에 언급했듯이, 대기열에 무엇이 있는지보기 위해 설문 조사를 할 필요가 없다는 것이 내 생각입니다. 무언가가 거기에 놓여질 때 반응 할 수 있습니다. 따라서 많은 수의 항목 갑자기 여러 스레드에서 처리를 할 수 있습니다. 나는 지금 당장하고있는 투표를 피하려고 노력하고있다. –

답변

3

내가 돈이 ' Rx로 이것을 달성하는 최선의 방법을 알고 있지만, BlockingCollection<T>producer-consumer pattern을 사용하는 것이 좋습니다. 기본 스레드는 컬렉션 아래에 기본값으로 ConcurrentQueue<T>을 사용하는 항목을 추가합니다. 그런 다음 BlockingCollection<T>에 대해 Parallel::ForEach을 사용하여 컬렉션에서 많은 항목을 처리하여 동시에 시스템에 적합하다고 생각되는 별도의 Task이 있습니다. 이제는 기본 파티셔너가 원하는 경우보다 더 많은 오버 헤드를 생성하므로 ParallelExtensions 라이브러리의 GetConsumingPartitioner 메서드를 사용하여 가장 효율적으로 사용할 수 있습니다. 자세한 내용은 this blog post에서 확인할 수 있습니다.

는 메인 스레드가 완료되면 당신은에 BlockingCollection<T>Task::WaitCompleteAdding를 호출 Task 당신이 컬렉션의 모든 항목을 처리 할 때 모든 소비자를 기다리는 스핀 업.

+0

'BlockingCollection'을 사용하는 주된 catch는 소비하는 스레드가 차단된다는 것입니다. Observable 패턴은 처리 할 항목이있을 때만 스레드를 사용합니다. –

6

드류가 맞습니다. 실제로 ConcurrentQueue는 실제로 BlockingCollection이 사용하는 기본 데이터 구조라고 생각합니다. 아주 내 앞에 너무 보인다. 이 책의 7 장 * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 을 확인하고 BlockingCollection을 사용하는 방법과 여러 생성자와 여러 소비자가 각각 "대기열"에서 벗어나는 방법을 설명합니다. "GetConsumingEnumerable()"메소드를 살펴보고, 아마도 .ToObservable()을 호출하기를 원할 것입니다.

* 나머지는 꽤 평균입니다.

편집 : 여기

난 당신이 원하는 것을 생각 샘플 프로그램입니까?

class Program 
{ 
    private static ManualResetEvent _mre = new ManualResetEvent(false); 
    static void Main(string[] args) 
    { 
     var theQueue = new BlockingCollection<string>(); 
     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000)); 


     LoadQueue(theQueue, "Producer A"); 
     LoadQueue(theQueue, "Producer B"); 
     LoadQueue(theQueue, "Producer C"); 

     _mre.Set(); 

     Console.WriteLine("Processing now...."); 

     Console.ReadLine(); 
    } 

    private static void ProcessNewValue(string value, string consumerName, int delay) 
    { 
     Thread.SpinWait(delay); 
     Console.WriteLine("{1} consuming {0}", value, consumerName); 
    } 

    private static void LoadQueue(BlockingCollection<string> target, string prefix) 
    { 
     var thread = new Thread(() => 
            { 
             _mre.WaitOne(); 
             for (int i = 0; i < 100; i++) 
             { 
              target.Add(string.Format("{0} {1}", prefix, i)); 
             } 
            }); 
     thread.Start(); 
    } 
} 
+0

그건 .... 독창적 인 사람 ... Rx를 BlockingCollection과 연결하는 것입니다. 와우 .. 이걸로 파이프 라인을 만들 수도 있습니다 : https://msdn.microsoft.com/en-us/library/ff963548.aspx – Oooogi

관련 문제