2011-01-18 3 views
5

C# Queue을 사용하여 메시지 큐를 구현했습니다. while 루프로 처리하기 위해 대기열에서 사용 가능한 메시지를 가져 오는 사용자가 하나 뿐인 것입니다. 또한 대기열에 메시지를 넣는 제작자는 단 한 명뿐입니다.메시지 큐 생각

위의 메시지 대기열에 lock이있어 소비자와 생산자가 동시에 대기열에 액세스 할 수 없습니다.

제 질문은 lock이 필요합니까? QueueCount 속성을 AFTER 후에 실제로 추가하고 소비자가 검색하기 전에 Count을 확인하면 소비자는 lock이 없어도 완전한 메시지 항목을 받아야합니다. 권리? 따라서 우리는 부분적인 메시지 항목 문제에 직면하지 않을 것입니다. 그럼 우리는 그걸 없애 버릴 수 있어요 lock?

lock은 시스템 속도를 떨어 뜨리고 우리는 매우 생산자가 많기 때문에 검색 스레드가 잠시 중단되는 것을 볼 수 있습니다.

편집 :

불행하게도 우리는 닷넷 3.5을 사용하고 있습니다.

+1

는 .NET 4를 사용할 수있는 경우를, 그것은'BlockingCollection 가'과'ConcurrentQueue'는 성능으로, HTTP를 참조하십시오 //geekswithblogs.net/BlackRabbitCoder/archive/2010/06/07/c-system.collections.concurrent.concurrentqueue-vs.-queue.aspx –

+0

성능 문제가있는 경우 몇 가지 코드를 게시 할 수 있습니까? 또한 제작자가 무겁다 고 말할 때 초당 항목 수는 얼마나됩니까? –

답변

10

실제 문제는 큐 삽입 또는 큐 제거가 발생하는 동안 큐의 내부 데이터 구조가 변경 될 수 있으며이 기간 동안 데이터 구조가 다른 스레드의 관점에서 불확정 상태에 있다는 것입니다.

예를 들어, 대기열에 추가하려면 내부 데이터 구조가 확장되어야하고 새 구조가 만들어지고 이전 항목은 이전 구조를 새 구조로 복사해야합니다. 이 프로세스에는 많은 단계가 필요하며 작업이 완료되지 않은 다른 스레드가 큐에 액세스하는 것이 위험 할 수 있습니다.

따라서 enqueue \ dequeue 중에 이러한 작업을 논리적으로 원자 적으로 보이도록 잠글 수 있습니다.

닷넷 4.0에서 새로운 ConcurrentQueue 클래스를 사용해보십시오. 비 잠금 알고리즘을 사용하기 때문에 더 나은 성능 특성을 가질 수 있습니다.

4

Queue<T>을 사용하는 경우 잠금 장치가 필요합니다. 이를 ConcurrentQueue<T>으로 바꾸면 쉽게 제거 할 수 있지만이 코드를 BlockingCollection<T>으로 바꾸면이 코드를 단순화하는 것이 좋습니다.

이것은 소비자가 자물쇠를 제거하고 점검하는 동안 collection.GetConsumingEnumerable()에서 단 하나의 foreach를 수행 할 수있게합니다. 제작자는 자물쇠를 제거하고 필요에 따라 항목을 추가 할 수 있습니다. 그것은 또한 당신이 현재 "매우 많은 프로듀서"를 언급했기 때문에 쉽게 여러 프로듀서를 사용하여 확장 할 수 있습니다.

0

잠금을 설정해야합니다. 클래스는 스레드 세이프가 아닙니다. System.CollectionsQueue을 사용하는 경우 편리한 스레드 안전 Queue (System.Collections.Queue.Synchronized()Queue을 반환합니다)이 있습니다.그렇지 않으면, 동기화 Queue<T>.SyncRoot 제공하는 객체를 사용하십시오 : 당신이 .NET 3.5을 사용하는 경우에도

using System.Collections.Generic; 
public static class Q_Example 
{ 
    private readonly Queue<int> q = new Queue<int>(); 
    public void Method1(int val) 
    { 
     lock(q.SyncRoot) 
     { 
      q.EnQueue(val); 
     } 
    } 

    public int Method2() 
    { 
     lock(q.SyncRoot) 
     { 
      return q.Dequeue(); 
     } 
    } 
} 
+0

큐에 대해 읽기 전용입니까? – 5YrsLaterDBA

+0

을 System.Collections.Queue.Synchronized()와 함께 사용하면 더 이상 락커가 필요하지 않습니까? – 5YrsLaterDBA

+0

1. 참조 q가 대상을 변경하지 않기 때문입니다. Readonly는 가리키는 객체/구조체를 수정하지 못하도록하지 않습니다. 2. 비 generic 콜렉션을 반환하므로 .NET에서는 1.1 # – dzendras

0

ConcurrentQueue 사용할 수 있습니다. Reactive Extensions에는 .NET 3.5에 포함 된 Parallel Extensions to .NET 3.5 - .NET 4.0에 포함 된 Task Parallel Library의 선구자가 포함됩니다.

1

아니요, 일관되게 작동하지 않습니다 ... 왜?

의 우리가 두 개의 스레드에서 동시에 호출 할 수하려고하는 두 가지 방법을 분해하자 (하나 & 한 작가 읽기) : 변수의 세 가지의 안전을 위의 코드를 감안할 때

public T Dequeue() 
{ 
    if (this._size == 0) 
    { 
     ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_EmptyQueue); 
    } 
    T local = this._array[this._head]; 
    this._array[this._head] = default(T); 
    this._head = (this._head + 1) % this._array.Length; 
    this._size--; 
    this._version++; 
    return local; 
} 

public void Enqueue(T item) 
{ 
    if (this._size == this._array.Length) 
    { 
     int capacity = (int) ((this._array.Length * 200L)/100L); 
     if (capacity < (this._array.Length + 4)) 
     { 
      capacity = this._array.Length + 4; 
     } 
     this.SetCapacity(capacity); 
    } 
    this._array[this._tail] = item; 
    this._tail = (this._tail + 1) % this._array.Length; 
    this._size++; 
    this._version++; 
} 

을 경우 (그리고 경우에만) 큐에 충분한 용량이 있습니다. _array, _head 및 _tail 필드는 위의 두 가지 방법 중 하나에서만 수정되지 않았거나 수정되었습니다.

lock()을 제거 할 수없는 이유는 두 가지 방법 모두 _size 및 _version을 수정하기 때문입니다. 논란의 여지는 있지만 _version의 충돌을 무시할 수는 있지만 _size의 충돌로 인해 원치 않는 예측할 수없는 동작이 발생할 수 있습니다.

+0

프로필에 따르면이 코드는 공개 된 것이며 저작권에서 자유 롭습니까? ;) –

+0

@chibacity LMAO - 네, 거기에 몇 가지 주목할만한 예외가 있다고 생각합니다;) 이제 위의 코드를 가져 와서도 위의 라이센스를받을 수 없습니다. 우리는 CLR/프레임 워크와 그것을 사용하는 최선의 방법을 이해하려고 할 때 MS가 우리의 사소한 불편 함을 신경 쓰지 않기를 바라고 있습니다. –

1

둘째, 단일 판독기와 단일 기록기에 대한 잠금없는 대기열을 작성하는 것은 매우 쉽습니다. 이 개념은 매우 원시적 인 예이지만,이 작업을 수행합니다

class LocklessQueue<T> 
{ 
    class Item 
    { 
     public Item Next; 
     bool _valid; 
     T _value; 
     public Item(bool valid, T value) 
     { 
      _valid = valid; 
      _value = value; 
      Next = null; 
     } 
     public bool IsValid { get { return _valid; } } 
     public T TakeValue() 
     { 
      T value = _value; 
      _valid = false; 
      _value = default(T); 
      return value; 
     } 
    } 

    Item _first; 
    Item _last; 

    public LocklessQueue() 
    { 
     _first = _last = new Item(false, default(T)); 
    } 

    public bool IsEmpty 
    { 
     get 
     { 
      while (!_first.IsValid && _first.Next != null) 
       _first = _first.Next; 
      return false == _first.IsValid; 
     } 
    } 

    public void Enqueue(T value) 
    { 
     Item i = new Item(true, value); 
     _last.Next = i; 
     _last = i; 
    } 

    public T Dequeue() 
    { 
     while (!_first.IsValid && _first.Next != null) 
      _first = _first.Next; 

     if (IsEmpty) 
      throw new InvalidOperationException();//queue is empty 

     return _first.TakeValue(); 
    } 
} 
+0

공간을 재 할당하지 않기 때문에 위의 LocklessQueue에 대한 잠금이 필요하지 않습니까? – 5YrsLaterDBA

+0

단일 리더 및 단일 작성자 용이지만 다중 판독기 및 다중 작성자를 독자적으로 sychronize하여 한 번에 하나의 판독기 만 읽고 한 번에 하나의 작성기 만 작성할 수 있도록하는 경우이를 사용할 수 있습니다. 독자와 작가 사이에는 동기화가 필요 없습니다. 권리? – 5YrsLaterDBA

+0

@ 5YrsLaterDBA 올바른,하지만 당신은 내 블로그에이 게시물에 대한 몇 가지 읽을 수 있습니다 : http://csharptest.net/?p=465 –