2011-03-31 7 views
1

웹 서비스를위한 비교적 간단한 "프록시"응용 프로그램을 작성하고 있습니다. 일반적으로 TCP 서버 (비동기 연결)는 클라이언트에서 데이터를 읽고 (문자열) 읽고 해당 콜백 함수의 일부로 두 대기열 (Q1 및 Q2) 중 하나에 데이터를 저장합니다. 다른 스레드는이 대기열의 데이터를 읽고이를 웹 서비스에 전달합니다. Q1의 데이터는 Q2의 데이터보다 우선해야합니다..NET 제작자 - 소비자 질문

저는 프로듀서/소비자 패턴에 대해 읽었습니다. 큐와 관련하여 구현하려고 시도한 것 같습니다. 대기열에 넣기 및 대기열에서 빼는 작업이 서로 다른 스레드에서 일어나기 때문에 내 대기열이 스레드로부터 안전하고 일종의 잠금 메커니즘을 지원해야하는 것은 분명합니다. 이것은 .NET 4.0 응용 프로그램이며 새로운 BlockingCollection 및 ConcurrentQueue 클래스에 대한 문서를 보았습니다. 그러나이 차이가 무엇인지 또는이 시나리오에서 어떻게 구현할 것인지 확실하지 않습니다. 누구든지 그걸 좀 더 밝힐 수 있습니까? 고맙습니다!

답변

2

나는 다음 클래스처럼 그것을 할 것입니다. 대기열 중 하나에 항목을 추가 할 때 Enqueue()으로 전화하면됩니다. 이 메서드는 항상 (거의) 즉시 반환합니다. 다른 스레드에서는 항목을 소비 할 준비가되었을 때 Dequeue()으로 전화를 겁니다. 우선 순위가 높은 대기열에서 먼저 가져옵니다. 현재 대기열에서 항목을 사용할 수없는 경우 호출이 차단됩니다. 생산이 끝나면 Complete()으로 전화하십시오. 호출이 이루어지고 두 대기열이 모두 비어지면 Dequeue()으로의 다음 호출 (또는 현재 차단 된 호출)은 InvalidOperationException을 던집니다.

생산자가 소비자보다 오랜 기간 동안 빠를 수 있다면 대기열 (new BlockingCollection<T>(capacity))을 묶어야합니다. 그러나이 경우 낮은 우선 순위 항목과 높은 우선 순위 항목을 모두 생성하는 스레드가 하나만있는 경우 우선 순위가 높은 항목은 우선 순위가 낮은 항목을 기다려야 할 수 있습니다. 우선 순위가 높은 항목을 생성하는 스레드와 우선 순위가 낮은 항목을 생성하는 스레드가 하나씩있어 문제를 해결할 수 있습니다. 또는 우선 순위가 높은 대기열 만 바인딩하면 한 번에 100 만 개의 우선 순위가 낮은 항목을 얻을 수 없기를 바랍니다.

class Worker<T> 
{ 
    BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>(); 
    BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>(); 

    public void Enqueue(T item, bool highPriority) 
    { 
     BlockingCollection<T> queue; 
     if (highPriority) 
      queue = m_highPriorityQueue; 
     else 
      queue = m_lowPriorityQueue; 

     queue.Add(item); 
    } 

    public T Dequeue() 
    { 
     T result; 

     if (!m_highPriorityQueue.IsCompleted) 
     { 
      if (m_highPriorityQueue.TryTake(out result)) 
       return result; 
     } 

     if (!m_lowPriorityQueue.IsCompleted) 
     { 
      if (m_lowPriorityQueue.TryTake(out result)) 
       return result; 
     } 

     if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted) 
      throw new InvalidOperationException("All work is done."); 
     else 
     { 
      try 
      { 
       BlockingCollection<T>.TakeFromAny(
        new[] { m_highPriorityQueue, m_lowPriorityQueue }, 
        out result); 
      } 
      catch (ArgumentException ex) 
      { 
       throw new InvalidOperationException("All work is done.", ex); 
      } 

      return result; 
     } 
    } 

    public void Complete() 
    { 
     m_highPriorityQueue.CompleteAdding(); 
     m_lowPriorityQueue.CompleteAdding(); 
    } 
} 
+0

대단히 감사합니다. 나는 정보와 제안에 정말로 감사한다! – user685869

0

BlockingCollection은 기본적으로 ConcurrentQueue를 사용합니다. 응용 프로그램에 적합해야합니다. F #을 사서함 및 비동기 블록과 함께 사용하면 더 쉬울 수도 있습니다. 나는 공통 구현의 샘플 포스트를 좀 더 일찍 만들었다.

Map/reduce with F# Agent or MailboxProcessor

관련 문제