2010-07-19 7 views
7

System.Collections.Concurrent에는 다중 스레드 환경에서 잘 작동하는 몇 가지 새로운 컬렉션이 있습니다. 그러나 약간 제한적입니다. 항목이 사용 가능해질 때까지 차단되거나 default(T) (TryXXX 메소드)을 반환합니다.비 블로킹 동시 수집?

스레드로부터 안전하지만, 호출 스레드를 차단하는 대신 콜백을 사용하여 하나 이상의 항목을 사용할 수 있음을 알리는 콜렉션이 필요합니다.

내 현재 솔루션은 BlockingCollection을 사용하지만 대리자와 함께 APM을 사용하여 다음 요소를 가져 오는 것입니다. 즉, 컬렉션에서 Take s 인 메서드에 대한 대리자를 만들고 BeginInvoke을 사용하여 해당 대리자를 실행합니다.

불행히도이 작업을 수행하기 위해 수업 시간에 많은 상태를 유지해야합니다. 더 나쁜 것은 클래스가 스레드로부터 안전하지 않다는 것입니다. 단일 스레드에서만 사용할 수 있습니다. 유지 보수성의 한계를 극복하고 있습니다.

제가 여기 꽤 간단한 작업을 수행하는 라이브러리가 있다는 것을 알고 있습니다 (Reactive Framework는 이것들 중 하나입니다). 그러나 버전 외의 참조를 추가하지 않고도 목표를 달성하고 싶습니다. 4 프레임 워크.

내 목표를 달성하는 외부 참조가 필요하지 않은 더 좋은 패턴이 있습니까?


TL; DR :

이 요구 사항을 만족 어떤 패턴이 있습니까 :

는 "나는 다음 요소에 대한 준비 컬렉션을 신호 할 필요가, 그리고 컬렉션이 실행이 그 다음 요소가 도착했을 때 어떤 스레드도 막히지 않고 콜백합니다. "

+0

스레드로부터 안전할까요? 델리게이트가 호출되기 전에 사용할 수있는 항목을 사용할 수 없게되는 것을 멈추게하는 것은 무엇입니까? 그리고 전반적인 목표 (즉, 큐잉 시스템)는 무엇입니까? –

+0

@ Adam 아이템을 소비 할 때의 좋은 점. 대리자는 컬렉션에서 제거 된 항목을 가져옵니다. 그래서 델리게이트의 실행은 아이템이 컬렉션에서'Take'-en이 될 때까지 차단되고 그 아이템은 EndInvoke에 전달 된'객체'입니다. 전반적인 목표는 다소 복잡합니다. 본질적으로 항목을 사용할 수있게 될 때까지 워크 플로를 유휴 상태로 유지해야합니다. 워크 플로우의 실행을 차단할 수 없으므로 항목을 단순히 가져 오면 호출 블록처럼 작동하지 않습니다. 북마크를 만든 다음 확장 프로그램에 전달해야합니다. Extension은 Delegate를 호출 해, 콜백 내의 북마크를 재개합니다. – Will

+0

불행히도 저는 워크 플로우에 대한 경험이 거의 없습니다. 질문에 세부 사항을 추가하면 누군가의 관심을 끌 수 있습니다. :-) –

답변

4

두 가지 가능한 해결책이 있다고 생각합니다. 나는 어느 쪽에도 만족하지는 못하지만 적어도 APM 접근법에 대한 합리적인 대안을 제공합니다.

없이 차단 스레드 귀하의 요구 사항을 충족하지 않는 첫 번째,하지만 난 당신이 콜백을 등록 할 수 있습니다 그들은 라운드 로빈 방식으로 호출되는 것이기 때문에 오히려 우아 생각하지만, 여전히 호출하는 기능을 가지고 Take 또는 TryTake 보통은 BlockingCollection입니다. 이 코드는 항목이 요청 될 때마다 콜백을 강제로 등록합니다. 그것은 수집을위한 신호 메커니즘입니다. 이 접근법에 대한 좋은 점은 Take에 대한 호출이 두 번째 솔루션에서와 마찬가지로 굶어 죽지 않는다는 것입니다.

public class NotifyingBlockingCollection<T> : BlockingCollection<T> 
{ 
    private Thread m_Notifier; 
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
     m_Notifier = new Thread(Notify); 
     m_Notifier.IsBackground = true; 
     m_Notifier.Start(); 
    } 

    private void Notify() 
    { 
     while (true) 
     { 
      Action<T> callback = m_Callbacks.Take(); 
      T item = Take(); 
      callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
     } 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     m_Callbacks.Add(callback); 
    } 
} 

둘째는 차단 스레드가 없다는 요구 사항을 충족합니다. 콜백 호출을 스레드 풀로 전송하는 방법에 주목하십시오. 동기식으로 실행되면 잠금 장치가 더 길게 유지되어 병목 현상이 AddRegisterForTake이 될 것이라고 생각하기 때문에이 작업을 수행했습니다. 나는 그것을 철저히 살펴 봤고 라이브가 잠길 수 있다고 생각하지 않는다. (아이템과 콜백 모두 가능하지만 콜백은 결코 실행되지 않는다.) 그러나 스스로 확인하여 검증하려고 할 수도있다. 여기서 유일한 문제는 콜백이 항상 우선시되므로 Take에 대한 호출이 굶어 죽을 수 있다는 것입니다.

public class NotifyingBlockingCollection<T> 
{ 
    private BlockingCollection<T> m_Items = new BlockingCollection<T>(); 
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
    } 

    public void Add(T item) 
    { 
     lock (m_Callbacks) 
     { 
      if (m_Callbacks.Count > 0) 
      { 
       Action<T> callback = m_Callbacks.Dequeue(); 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Items.Add(item); 
      } 
     } 
    } 

    public T Take() 
    { 
     return m_Items.Take(); 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     lock (m_Callbacks) 
     { 
      T item; 
      if (m_Items.TryTake(out item)) 
      { 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Callbacks.Enqueue(callback); 
      } 
     } 
    } 
} 
+0

답변 해 주셔서 감사합니다, 하지만 그 약간은 내가 찾고있는 것이 아닙니다. 이것은 내가 현재하고있는 일이지만, APM이 컬렉션 (당신이 제공 한 코드)에 푸시 된 상태입니다. 내 문제의 핵심은 APM이 내 요구 사항에 맞지 않는다는 것, 그저 구현 한 것입니다. 내 요구 사항은 "다음 요소에 대한 준비가 된 콜렉션에 신호를 보내고 다음 스레드가 차단되지 않은 상태에서 콜렉션이 콜백을 실행하게하려면 어떻게해야합니까?"라는 질문에 대한 해결책을 제공하는 패턴이 필요합니다. – Will

+0

나는 네가 뭘했는지 상상하지 못했다. 그것은 흥미로운 문제입니다. 너무 나쁜'Add'는'virtual'이 아니며, 그렇지 않다면 어떻게 든 알림을 주입 할 수 있습니다. 어쩌면 블로킹 큐 구현 중 하나를 시작점으로 사용할 수 있습니다. 문제는 당신이 그 통지를 전달하는 방법을 신중히해야한다는 것입니다. 그렇지 않으면 다른 소비자가 먼저 항목을 붙잡을 것입니다. 시간이되면 오늘 같이 놀아도 될 것 같습니다. 알아 내면 직접 답변을 게시하십시오. 모르겠지만 ... 펀트하고 다른 라이브러리를 참조하는 것이 더 쉬울 수도 있습니다. –

+0

알림에는 다음 요소가 포함되어야하며 알리미가 제어해야합니다. 아마도 이것이 컬렉션이라는 생각은 잘못된 것일 수 있습니다. 이 메커니즘을 통해서만 다음 항목을 제공 할 수 있으므로 단일 항목에 대해 경합하는 두 명의 옵저버의 문제를 피할 수 있습니다. 다시 말해, 한 관찰자는 메커니즘 A를 사용하여 다른 항목이 콜백에 등록 된 동안 다음 항목을 가져올 수 없습니다 (즉, 'T Pop()'). – Will

3

어때? (이 명명법은 아마도 약간의 작업을 사용할 수 있습니다. 그리고 이것은 테스트되지 않았습니다.)

public class CallbackCollection<T> 
{ 
    // Sychronization object to prevent race conditions. 
    private object _SyncObject = new object(); 

    // A queue for callbacks that are waiting for items. 
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>(); 

    // A queue for items that are waiting for callbacks. 
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>(); 

    public void Add(T item) 
    { 
     Action<T> callback; 
     lock (_SyncObject) 
     { 
      // Try to get a callback. If no callback is available, 
      // then enqueue the item to wait for the next callback 
      // and return. 
      if (!_Callbacks.TryDequeue(out callback)) 
      { 
       _Items.Enqueue(item); 
       return; 
      } 
     } 

     ExecuteCallback(callback, item); 
    } 

    public void TakeAndCallback(Action<T> callback) 
    { 
     T item; 
     lock(_SyncObject) 
     { 
      // Try to get an item. If no item is available, then 
      // enqueue the callback to wait for the next item 
      // and return. 
      if (!_Items.TryDequeue(out item)) 
      { 
       _Callbacks.Enqueue(callback); 
       return; 
      } 
     } 
     ExecuteCallback(callback, item); 
    } 

    private void ExecuteCallback(Action<T> callback, T item) 
    { 
     // Use a new Task to execute the callback so that we don't 
     // execute it on the current thread. 
     Task.Factory.StartNew(() => callback.Invoke(item)); 
    } 
} 
+0

@ Brian의 NotifyingBlockingCollection을 새로 고치고 보았습니다. 그와 같이 보입니다. 동시에 대략 같은 해결책을 찾았습니다. –

+0

네, 우리는 확실히 같은 줄을 따라, 특히 현재 스레드에서 콜백 호출을 얻는 것에 대해 생각했습니다. –

관련 문제