2011-05-14 8 views
1

나는 리소스마다 프로듀서/소비자 패턴을 구현하려고합니다. 따라서 각 스레드는 관련된 하나의 리소스를 갖습니다. 예를 들어, 각 작업에서 결과를 쓸 때 StreamWriter이 필요한 작업 대기열이있을 수 있습니다. 각 작업에는 매개 변수가 전달되어야합니다.생산자 - 리소스가있는 소비자

조셉 알바하리 (Joseph Albahari)의 구현으로 시작했습니다 (제 수정 된 버전은 아래 참조).

T 자원이다 Action<T>Action 큐의 큐를 대체하고, Action의 스레드와 관련된 리소스를 전달한다. 그러나 이것으로 인해 매개 변수를 Action에 전달하는 방법의 문제가 생깁니다. 분명히 Action은 대리자로 바꿔야하지만 이는 작업이 대기열에 들어갈 때 ( ProducerConsumerQueue 클래스 외부에서) 매개 변수를 전달하는 방법의 문제를 남깁니다. 이 작업을 수행하는 방법에 대한 아이디어가 있습니까? ProducerConsumerQueue<T>에서

class ProducerConsumerQueue<T> 
    { 
     readonly object _locker = new object();    
     Thread[] _workers; 
     Queue<Action<T>> _itemQ = new Queue<Action<T>>(); 

     public ProducerConsumerQueue(T[] resources) 
     { 
      _workers = new Thread[resources.Length]; 

      // Create and start a separate thread for each worker 
      for (int i = 0; i < resources.Length; i++) 
      { 
       Thread thread = new Thread(() => Consume(resources[i])); 
       thread.SetApartmentState(ApartmentState.STA); 
       _workers[i] = thread; 
       _workers[i].Start(); 
      } 
     }   

     public void Shutdown(bool waitForWorkers) 
     { 
      // Enqueue one null item per worker to make each exit. 
      foreach (Thread worker in _workers) 
       EnqueueItem(null); 

      // Wait for workers to finish 
      if (waitForWorkers) 
       foreach (Thread worker in _workers) 
        worker.Join(); 
     } 

     public void EnqueueItem(Action<T> item) 
     { 
      lock (_locker) 
      { 
       _itemQ.Enqueue(item);   // We must pulse because we're 
       Monitor.Pulse(_locker);   // changing a blocking condition. 
      } 
     } 

     void Consume(T parameter) 
     { 
      while (true)      // Keep consuming until 
      {         // told otherwise. 
       Action<T> item; 
       lock (_locker) 
       { 
        while (_itemQ.Count == 0) Monitor.Wait(_locker); 
        item = _itemQ.Dequeue(); 
       } 
       if (item == null) return;   // This signals our exit. 
       item(parameter);       // Execute item. 
      } 
     } 
    } 

답변

2

유형 T는 자원이 자원을 포함하는 복합 유형이 될 수 있습니다 일 필요는 없습니다. .NET4를 사용하는 가장 쉬운 방법은 Tuple<StreamWriter, YourParameterType>입니다. 농산물/소비자 대기열은 단지 T을 먹고 뱉어 내기 때문에 Action<T>에서는 자원과 매개 변수를 얻기 위해 속성을 사용할 수 있습니다. Tuple을 사용하는 경우 Item1을 사용하여 리소스를 가져오고 Item2을 사용하여 매개 변수를 가져옵니다.

당신이 .NET4를 사용하지 않는 경우, 프로세스가 비슷하지만 당신이 당신 자신의 클래스를 생성 : 그것은 상황에 과잉 설계 할 수있다 일반적인 만드는 사실

public class WorkItem<T> 
{ 
    private StreamWriter resource; 
    private T parameter; 

    public WorkItem(StreamWriter resource, T parameter) 
    { 
     this.resource = resource; 
     this.parameter = parameter; 
    } 

    public StreamWriter Resource { get { return resource; } } 
    public T Parameter { get { return parameter; } } 
} 

. T를 원하는 유형으로 정의 할 수 있습니다.

또한 참조 용으로 동시 대기열 및 병렬 작업 라이브러리와 같은 사용 사례에 적용될 수있는 .NET4에 포함 된 다중 스레딩을 수행하는 새로운 방법이 있습니다. 또한 세마포어와 같은 전통적인 접근법과 결합 될 수 있습니다.

편집 :

  • 세마포어를 관리하기 위해 제한된 자원
  • 동시 큐에 대한 액세스를 제어 :이 방법으로 계속

    , 여기에 사용하는 방법을 보여줍니다 작은 샘플 클래스입니다 스레드 간 안전하게 리소스

  • 작업 병렬 라이브러리를 사용하는 작업 관리
여기

Processor 클래스 :

public class Processor 
{ 
    private const int count = 3; 
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>(); 
    private Semaphore semaphore = new Semaphore(count, count); 

    public Processor() 
    { 
     // Populate the resource queue. 
     for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i)); 
    } 

    public void Process(int parameter) 
    { 
     // Wait for one of our resources to become free. 
     semaphore.WaitOne(); 
     StreamWriter resource; 
     queue.TryDequeue(out resource); 

     // Dispatch the work to a task. 
     Task.Factory.StartNew(() => Process(resource, parameter)); 
    } 

    private Random random = new Random(); 

    private void Process(StreamWriter resource, int parameter) 
    { 
     // Do work in background with resource. 
     Thread.Sleep(random.Next(10) * 100); 
     resource.WriteLine("Parameter = {0}", parameter); 
     queue.Enqueue(resource); 
     semaphore.Release(); 
    } 
} 

지금 우리는이 같은 클래스를 사용할 수 있습니다로

var processor = new Processor(); 
for (int i = 0; i < 10; i++) 
    processor.Process(i); 

더 이상의 세 가지 작업을 동시에 계획되지 않습니다, 각각 자신의 자신의 StreamWriter 자원을 재활용했습니다.

+0

문제는 각 작업에 자체 StreamWriter를 제공하고 싶지 않다는 것입니다.각 StreamWriter가 작업을 실행할 때 다시 사용할 스레드에 속하기를 원합니다. – Johnny

관련 문제