2011-12-31 2 views
0

이 프로그램은 동기화에 사용되는 모니터이며 생산자와 소비자 및 버퍼를 읽고 쓸 수 있습니다. 다른 2 명의 소비자를 추가하려면 어떻게해야합니까? 제대로 작동합니까?소비자를 다중 스레드에 추가하는 방법

// Synchronized.cs 
    // Showing multiple threads modifying a shared object with synchronization. 

    using System; 
    using System.Threading; 

    namespace Synchronized 
    { 
     // this class synchronizes access to an integer 
     public class HoldIntegerSynchronized 
     { 
      // buffer shared by producer and consumer threads 
      private int buffer = -1; 

      // occupiedBufferCount maintains count of occupied buffers 
      private int occupiedBufferCount = 0; 

      // property Buffer 
      public int Buffer 
      { 
       get 
       { 
        // obtain lock on this object 
        Monitor.Enter(this); 

        // if there is no data to read, place invoking 
        // thread in WaitSleepJoin state 
        if (occupiedBufferCount == 0) 
        { 
         Console.WriteLine(
          Thread.CurrentThread.Name + " tries to read."); 

         DisplayState("Buffer empty. " + 
          Thread.CurrentThread.Name + " waits."); 

         Monitor.Wait(this); 
        } 

        // indicate that producer can store another value 
        // because a consumer just retrieved buffer value 
        --occupiedBufferCount; 
        // occupiedBufferCount -= 1; 
        DisplayState(
         Thread.CurrentThread.Name + " reads " + buffer); 

        // tell waiting thread (if there is one) to 
        // become ready to execute (Started state) 
        Monitor.Pulse(this); 

        // Get copy of buffer before releasing lock. 
        // It is possible that the producer could be 
        // assigned the processor immediately after the 
        // monitor is released and before the return 
        // statement executes. In this case, the producer 
        // would assign a new value to buffer before the 
        // return statement returns the value to the 
        // consumer. Thus, the consumer would receive the 
        // new value. Making a copy of buffer and 
        // returning the copy ensures that the 
        // consumer receives the proper value. 
        int bufferCopy = buffer; 

        // release lock on this object 
        Monitor.Exit(this); 

        return bufferCopy; 

       } // end get 

       set 
       { 
        // acquire lock for this object 
        Monitor.Enter(this); 

        // if there are no empty locations, place invoking 
        // thread in WaitSleepJoin state 
        if (occupiedBufferCount == 1) 
        { 
         Console.WriteLine(
          Thread.CurrentThread.Name + " tries to write."); 

         DisplayState("Buffer full. " + 
          Thread.CurrentThread.Name + " waits."); 

         Monitor.Wait(this); 
        } 

        // set new sharedInt value 
        buffer = value; 

        // indicate producer cannot store another value 
        // until consumer retrieves current sharedInt value 

        ++occupiedBufferCount; 
       // occupiedBufferCount += 1; 
        DisplayState(
         Thread.CurrentThread.Name + " writes " + buffer); 

        // tell waiting thread (if there is one) to 
        // become ready to execute (Started state) 
        Monitor.Pulse(this); 

        // release lock on this object 
        Monitor.Exit(this); 

       } // end set 

      } // end property Buffer 

      // display current operation and buffer state 
      public void DisplayState(string operation) 
      { 
       Console.WriteLine("{0,-35}{1,-9}{2}\n", 
        operation, buffer, occupiedBufferCount); 
      } 

     } // end class HoldIntegerSynchronized 

     // class Producer's Produce method controls a thread that 
     // stores values from 1 to 4 in sharedLocation 
     class Producer 
     { 
      private HoldIntegerSynchronized sharedLocation; 
      private Random randomSleepTime; 

      // constructor 
      public Producer(
       HoldIntegerSynchronized shared, Random random) 
      { 
       sharedLocation = shared; 
       randomSleepTime = random; 
      } 

      // store values 1-4 in object sharedLocation 
      public void Produce() 
      { 
       // sleep for random interval upto 3000 milliseconds 
       // then set sharedLocation's Buffer property 
       for (int count = 1; count <= 10; count++) 
       { 
        Thread.Sleep(randomSleepTime.Next(1, 3000)); 
        sharedLocation.Buffer = count; 
       } 

       Console.WriteLine(Thread.CurrentThread.Name + 
        " done producing.\nTerminating " + 
        Thread.CurrentThread.Name + ".\n"); 

      } // end method Produce 

     } // end class Producer 

     // class Consumer's Consume method controls a thread that 
     // loops four times and reads a value from sharedLocation 
     class Consumer 
     { 
      private HoldIntegerSynchronized sharedLocation; 
      private Random randomSleepTime; 

      // constructor 
      public Consumer(
       HoldIntegerSynchronized shared, Random random) 
      { 
       sharedLocation = shared; 
       randomSleepTime = random; 
      } 

      // read sharedLocation's value four times 
      public void Consume() 
      { 
       int sum = 0; 

       // get current thread 
       Thread current = Thread.CurrentThread; 

       // sleep for random interval upto 3000 milliseconds 
       // then add sharedLocation's Buffer property value 
       // to sum 
       for (int count = 1; count <= 10; count++) 
       { 
        Thread.Sleep(randomSleepTime.Next(1, 3000)); 
        sum += sharedLocation.Buffer; 
       } 

       Console.WriteLine(Thread.CurrentThread.Name + 
        " read values totaling: " + sum + 
        ".\nTerminating " + Thread.CurrentThread.Name + ".\n"); 

      } // end method Consume 

     } // end class Consumer 

     // this class creates producer and consumer threads 
     class SharedCell 
     { 
      // create producer and consumer threads and start them 
      static void Main(string[] args) 
      { 
       // create shared object used by threads 
       HoldIntegerSynchronized holdInteger = 
        new HoldIntegerSynchronized(); 

       // Random object used by each thread 
       Random random = new Random(); 

       // create Producer and Consumer objects 
       Producer producer = 
        new Producer(holdInteger, random); 

       Consumer consumer = 
        new Consumer(holdInteger, random); 

       Consumer consumer2= 
        new Consumer(holdInteger, random); 

       Consumer consumer3= 
        new Consumer(holdInteger, random); 

       // output column heads and initial buffer state 
       Console.WriteLine("{0,-35}{1,-9}{2}\n", 
        "Operation", "Buffer", "Occupied Count"); 
       holdInteger.DisplayState("Initial state"); 

       // create threads for producer and consumer and set 
       // delegates for each thread 
       Thread producerThread = 
        new Thread(new ThreadStart(producer.Produce)); 
       producerThread.Name = "Producer"; 

       Thread consumerThread = 
        new Thread(new ThreadStart(consumer.Consume)); 
       consumerThread.Name = "Consumer"; 



       // start each thread 
       producerThread.Start(); 
       consumerThread.Start(); 

      } // end method Main 

     } // end class SharedCell 
    } // end end!. 

답변

1

나는 그것이 condvars을 사용하는 안전한 스레드를 구현하는 클래스와 블록 큐 작성이 상황에서 저를 behooves 것을 발견 (.NET에서을 Monitor.Pulse()와 Monitor.PulseAll는()) 대기 대기열에 있습니다.

이 솔루션은 요소를 추가 및 제거하려는 스레드만큼 안전합니다.

예 :

using System; 
using System.Threading; 


namespace MCSharp { 


    /** <summary> 
     A thread safe, blocking queue.</summary> 
     <remarks> 
     All members of this class are thread safe.</remarks> 
    */ 
    public class MessageQueue<T> { 


     private LinkedQueue<T> messagequeue=new LinkedQueue<T>(); 
     private Object waitobject=new Object(); 
     private Int32 numwaitingthreads=0; 
     private Object emptyobject=new Object(); 


     /** <summary> 
      Returns the number of items currently waiting in the queue.</summary> 
     */ 
     public Int32 MessageCount { 

      get { lock (waitobject) return messagequeue.Count; } 

     } 


     /** <summary> 
      Returns the number of threads currently waiting for items to be added to the queue.</summary> 
     */ 
     public Int32 ThreadCount { 

      get { lock (waitobject) return numwaitingthreads; } 

     } 


     /** <summary> 
      Creates a new queue.</summary> 
     */ 
     public MessageQueue() { } 


     /** <summary> 
      Adds a new item to the back of the queue.</summary> 
      <param name="message"> 
      The item to add to the queue.</param> 
     */ 
     public void Enqueue (T message) { 

      lock (waitobject) { 

       messagequeue.Enqueue(message); 

       Monitor.Pulse(waitobject); 

      } 

     } 


     /** <summary> 
      Removes an item from the front of the queue.</summary> 
      <remarks> 
      If there is currently no item at the front of the queue the thread will block 
      until there is one, and then return with that item.</remarks> 
      <returns> 
      The item from the front of the queue.</returns> 
     */ 
     public T Dequeue() { 

      lock (waitobject) { 

       while (messagequeue.Count==0) { 

        numwaitingthreads++; 

        Monitor.Wait(waitobject); 

        numwaitingthreads--; 

       } 

       lock (emptyobject) { 

        Monitor.PulseAll(emptyobject); 

        return messagequeue.Dequeue(); 

       } 

      } 

     } 


     /** <summary> 
      Waits for the queue to empty.</summary> 
      <remarks> 
      The calling thread blocks until the thread's <see cref="MCSharp.MessageQueue{T}.MessageCount"> 
      message count</see> is zero.</remarks> 
     */ 
     public void WaitForEmpty() { 

      while (true) { 

       Monitor.Enter(waitobject); 

       try { 

        if (messagequeue.Count==0) { 

         return; 

        } 

        Monitor.Enter(emptyobject); 

       } finally { 

        Monitor.Exit(waitobject); 

       } 

       try { 

        Monitor.Wait(emptyobject); 

       } finally { 

        Monitor.Exit(emptyobject); 

       } 

      } 

     } 


    } 


} 

은 여기에서 "LinkedQueue"클래스를 의미하는 것입니다 :

using System; 


namespace MCSharp { 


    /** <summary> 
     Implements a queue based around a singly linked list.</summary> 
     <remarks> 
     The .NET's built in queue implementation uses a dynamically-resizing array 
     for its data storage.</remarks> 
    */ 
    public class LinkedQueue<T> { 


     private class SinglyLinkedListNode<NodeT> { 


      public SinglyLinkedListNode<NodeT> Next=null; 
      public NodeT Item; 


      public SinglyLinkedListNode (NodeT item) { 

       Item=item; 

      } 


     } 


     private SinglyLinkedListNode<T> head=null; 
     private SinglyLinkedListNode<T> tail=null; 
     private Int32 count=0; 


     /** <summary> 
      Returns the number of items in the queue.</summary> 
     */ 
     public Int32 Count { 

      get { return count; } 

     } 


     /** <summary> 
      Creates a new queue.</summary> 
     */ 
     public LinkedQueue() { } 


     /** <summary> 
      Adds an item to the rear of the queue.</summary> 
      <param name="item"> 
      The item to add to the queue.</param> 
     */ 
     public void Enqueue (T item) { 

      SinglyLinkedListNode<T> newnode=new SinglyLinkedListNode<T>(item); 

      count++; 

      if (head==null) { 

       head=newnode; 
       tail=newnode; 

      } else { 

       tail.Next=newnode; 
       tail=newnode; 

      } 

     } 


     /** <summary> 
      Returns the item at the front of the queue.</summary> 
      <returns> 
      The item at the front of the queue.</returns> 
     */ 
     public T Dequeue() { 

      if (count==0) throw new InvalidOperationException(); 

      T returnthis=head.Item; 

      if (head.Next==null) tail=null; 

      head=head.Next; 

      count--; 

      return returnthis; 

     } 


    } 


} 
관련 문제