2009-11-01 6 views
21

저는 최근에 생산자/소비자 패턴 C# 구현을 접했습니다. 그것은 매우 간단합니다 (적어도 나를 위해) 매우 우아합니다.C# producer/consumer

가 2006 주위에 고안 한 것 같다, 그래서이 구현은
경우 내가 궁금

- 안전
- 여전히 적용

코드 (원본 코드가 http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375에서 참조 된) 이하

using System; 
using System.Collections; 
using System.Threading; 

public class Test 
{ 
    static ProducerConsumer queue; 

    static void Main() 
    { 
     queue = new ProducerConsumer(); 
     new Thread(new ThreadStart(ConsumerJob)).Start(); 

     Random rng = new Random(0); 
     for (int i=0; i < 10; i++) 
     { 
      Console.WriteLine ("Producing {0}", i); 
      queue.Produce(i); 
      Thread.Sleep(rng.Next(1000)); 
     } 
    } 

    static void ConsumerJob() 
    { 
     // Make sure we get a different random seed from the 
     // first thread 
     Random rng = new Random(1); 
     // We happen to know we've only got 10 
     // items to receive 
     for (int i=0; i < 10; i++) 
     { 
      object o = queue.Consume(); 
      Console.WriteLine ("\t\t\t\tConsuming {0}", o); 
      Thread.Sleep(rng.Next(1000)); 
     } 
    } 
} 

public class ProducerConsumer 
{ 
    readonly object listLock = new object(); 
    Queue queue = new Queue(); 

    public void Produce(object o) 
    { 
     lock (listLock) 
     { 
      queue.Enqueue(o); 

      // We always need to pulse, even if the queue wasn't 
      // empty before. Otherwise, if we add several items 
      // in quick succession, we may only pulse once, waking 
      // a single thread up, even if there are multiple threads 
      // waiting for items.    
      Monitor.Pulse(listLock); 
     } 
    } 

    public object Consume() 
    { 
     lock (listLock) 
     { 
      // If the queue is empty, wait for an item to be added 
      // Note that this is a while loop, as we may be pulsed 
      // but not wake up before another thread has come in and 
      // consumed the newly added object. In that case, we'll 
      // have to wait for another pulse. 
      while (queue.Count==0) 
      { 
       // This releases listLock, only reacquiring it 
       // after being woken up by a call to Pulse 
       Monitor.Wait(listLock); 
      } 
      return queue.Dequeue(); 
     } 
    } 
} 
+0

어떤 .NET 버전을 목표로합니까? –

+0

안녕 하시다면, BlockingCollection 을 사용하여 [이 링크] (https://stackoverflow.com/questions/733793/implementing-the-producer-consumer-pattern-in-products)에서 간단한 Producer-Consumer 패턴을 구현하는 작업 예제가 있습니다. c-sharp/47179576 # 47179576). –

답변

28

코드가 그보다 오래되었습니다. .NET 2.0이 나올 때까지 약간 시간을 썼습니다. 생산자/소비자 큐의 개념하지만 그보다 오래된 방법 :)입니다

네, 코드는 지금까지 내가 알고 있어요으로 안전합니다 -하지만 몇 가지 결함이 있습니다

  • 을 그것은 비 제네릭입니다. 현대 버전은 확실하게 일반적인 것입니다.
  • 대기열을 중지 할 방법이 없습니다. 대기열을 중지하는 간단한 방법 (모든 소비자 스레드가 종료되도록)은 대기열에 넣을 수있는 "작업 중지"토큰을 갖는 것입니다. 그런 다음 스레드가있는만큼의 토큰을 추가합니다. 또는 중지하려는 플래그가 별도로 있습니다. (대기열의 현재 작업을 모두 마치기 전에 다른 스레드를 중지 할 수 있습니다.)
  • 작업이 매우 작 으면 한 번에 하나의 작업을 소비하는 것이 가장 효율적이지 않을 수 있습니다.

솔직히 말해 코드 뒤에있는 아이디어가 코드 자체보다 중요합니다.

+0

펄싱이 필연적으로 소비자를 깨우지는 않는다는 것을 알고 있지만, 이론상 프로듀서 만이 무한하게 실행할 수 있습니다. 또한 측정 된 모니터 대기/펄스는 이벤트 대기보다 성능상의 이점이 없습니다 – TakeMeAsAGuest

+0

@TakeMeAsAGuest : 첫 번째 부분에 대해 어떤 의미인지는 확실하지 않습니다. 스레드가 모니터에서 기다리고 있지만 펄스가 아무 것도하지 않은 경우를 본 적이 있습니까? ? 성능에 관해서는 고려해야 할 많은 시나리오 (하드웨어, 소프트웨어, 대기 스레드 수 등)가 많이 있습니다. 조 더피 (Joe Duffy)의 저서에서 몇 가지 참조를 찾을 수 있는지 보겠습니다 ... –

23

다음 코드 조각을 수행 할 수 있습니다. 그것은 일반적이고, 널 (또는 어떤 플래그를 사용하든)을 큐에 넣어서 작업자 스레드가 종료하도록 지시하는 메소드를 가지고 있습니다.

코드는 여기에서 가져온 것입니다 http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace ConsoleApplication1 
{ 

    public class TaskQueue<T> : IDisposable where T : class 
    { 
     object locker = new object(); 
     Thread[] workers; 
     Queue<T> taskQ = new Queue<T>(); 

     public TaskQueue(int workerCount) 
     { 
      workers = new Thread[workerCount]; 

      // Create and start a separate thread for each worker 
      for (int i = 0; i < workerCount; i++) 
       (workers[i] = new Thread(Consume)).Start(); 
     } 

     public void Dispose() 
     { 
      // Enqueue one null task per worker to make each exit. 
      foreach (Thread worker in workers) EnqueueTask(null); 
      foreach (Thread worker in workers) worker.Join(); 
     } 

     public void EnqueueTask(T task) 
     { 
      lock (locker) 
      { 
       taskQ.Enqueue(task); 
       Monitor.PulseAll(locker); 
      } 
     } 

     void Consume() 
     { 
      while (true) 
      { 
       T task; 
       lock (locker) 
       { 
        while (taskQ.Count == 0) Monitor.Wait(locker); 
        task = taskQ.Dequeue(); 
       } 
       if (task == null) return;   // This signals our exit 
       Console.Write(task); 
       Thread.Sleep(1000);    // Simulate time-consuming task 
      } 
     } 
    } 
} 
+3

이것은 지금까지 생산자 소비자 패턴을 가장 잘 구현 한 것입니다. 최근 멀티 스레드 응용 프로그램에서 이것을 사용했으며 1000-1500 스레드에서도 원활하게 작동했습니다. –

+0

나는 C# 기술에 녹슬지 않아서 무언가를 놓쳤다 고 확신하지만,이 예제는 소비 된'task' 메소드를 호출하지 않는다. (참조 된 [비 - 일반 코드] (http : // www.albahari.com/threading/part4.aspx#%5FWait%5Fand%5FPulse) 소비자의'Action' 델리게이트를 저장하고 호출합니다. 그렇다면 메소드 호출을 보여주지 않으면이 generic을 만드는 것은 무엇입니까? 소비 된 일? (이 예제를 참조한 구현물과 비교하여이 예제를 둘러보기 위해 노력하고 있습니다.) – Inactivist

+2

@Inactivist 액션 을 ctor에 추가하고 _action 필드로 저장 한 다음 Dequeue()를 수행 한 직후에이를 호출 할 수 있습니다. _action 태스크); – Marcus

1

경고 : 당신이 코멘트를 읽으면, 내 대답은

가능한 교착 상태에있다 : 잘못 이해하게 될 것입니다 당신의 코드.

는 다음과 같은 경우는, 명확성을 위해, 나는 단일 스레드 방식을 사용 상상하지만, 수면 멀티 스레드로 쉽게 변환 할 수 있어야 :이하지 않는 경우 알려 주시기

// We create some actions... 
object locker = new object(); 

Action action1 =() => { 
    lock (locker) 
    { 
     System.Threading.Monitor.Wait(locker); 
     Console.WriteLine("This is action1"); 
    } 
}; 

Action action2 =() => { 
    lock (locker) 
    { 
     System.Threading.Monitor.Wait(locker); 
     Console.WriteLine("This is action2"); 
    } 
}; 

// ... (stuff happens, etc.) 

// Imagine both actions were running 
// and there's 0 items in the queue 

// And now the producer kicks in... 
lock (locker) 
{ 
    // This would add a job to the queue 

    Console.WriteLine("Pulse now!"); 
    System.Threading.Monitor.Pulse(locker); 
} 

// ... (more stuff) 
// and the actions finish now! 

Console.WriteLine("Consume action!"); 
action1(); // Oops... they're locked... 
action2(); 

마십시오 어떤 의미.

이것이 확인되면 질문에 대한 대답은 "아니오, 안전하지 않습니다"입니다.) 도움이되기를 바랍니다.

+0

모든 소비자가 자물쇠를 벗어 났을 때 항목이 대기열에 추가 될 수 있기 때문에 원래 포스터의 코드에 교착 상태가 표시되지 않습니다 (소비자가 다음에 자물쇠를 획득 할 때 그 사실을 관찰 할 때 대기열은 비어 있지 않으므로 대기 할 필요가 없습니다.) 또는 대기 중입니다 (이 경우 펄스는 소비자를 깨울 수 있습니다). – supercat

+0

나는 내가 무엇을 생각하고 있었는지에 대한 대부분의 정보를 잊어 버렸음을 인정해야한다. 다시 읽어 보면 문제는 while 회 돌이라고 생각합니다. 대기열에 무언가가있을 때까지 소비자 스레드를 잠근 상태로 유지하여 제작자가 잠그고 대기하는 것을 방지합니다. 그게 말이 되니? – DiogoNeves

+1

'Monitor.Wait()'메쏘드는 어떤 다른 쓰레드가 그것을 기다릴 때까지 기다리는 자물쇠를 해제합니다. 소비자 쓰레드는'Consume' 메소드 내에서 차단 될 것이지만, 그것은 프로듀서가 그것을 먹이지 못하게하지는 않을 것입니다. 가장 큰 위험은 소비자가 기대하는 모든 데이터를 생성하기 전에 생산자가 종료하면 소비자는 결코 도착하지 않을 물건을 영원히 기다릴 것입니다. 그것은 예를 들면 다음과 같이 처리 될 수있다. 'AllDone' 플래그를 가지고 있습니다. – supercat

13

위의 코드와 위의 article series에서 Monitor.Wait/Pulse가 어떻게 작동하는지 (그리고 일반적으로 스레드에 대해 많이) 배웠습니다. 존 (Jon)이 말했듯이, 그것은 가치가 있으며 실제로 안전하고 적용 가능합니다.

그러나, 현재로.NET 4에는 producer-consumer queue implementation in the framework이 있습니다. 나는 단지 그것을 나 자신을 발견했다. 그러나 지금까지 내가 필요한 모든 것을한다.