0

다중 스레드로 채울 수있는 작업 대기열이 있습니다 (ConcurrentQueue<MyJob>). 이 작업을 비동기 적으로 (주 스레드가 아닌) 연속 실행하도록 구현해야하지만 은 동시에 하나의 스레드 만 사용해야합니다.작업의 다중 스레드 대기열

public class ConcurrentLoop { 
    private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>(); 

    private static Task _currentTask; 
    private static object _lock = new object(); 

    public static void QueueJob(Job job) 
    { 
     _concurrentQueue.Enqueue(job); 
     checkLoop(); 
    } 

    private static void checkLoop() 
    { 
     if (_currentTask == null || _currentTask.IsCompleted) 
     { 
      lock (_lock) 
      { 
       if (_currentTask == null || _currentTask.IsCompleted) 
       { 
        _currentTask = Task.Run(() => 
        { 
          MyJob current; 
          while(_concurrentQueue.TryDequeue(out current)) 
           //Do something              
        }); 
       } 
      } 
     } 
    } 
} 

내 생각에이 코드는 문제가 : 나는 이런 식으로 뭔가를 시도했습니다 작업 끝내지 않고 실행하는 경우 (아직 완료로 TryDequeue false를 반환하지만 작업이 표시되지 않은)이 순간에 나는를 얻을 수 새로운 직업, 그것은 실행되지 않습니다. 내가 맞습니까? 그렇다면이 문제를 해결하는 방법

+0

스레드 수를 제한하려는 실제 이유가 있습니까? 대부분의 경우 프레임 워크가 최적화를 처리하도록하는 것이 좋습니다. – konkked

+0

한 번에 하나의 스레드가 실행하려는 코드 블록에 Lock을 사용하십시오. 이렇게하면 문제가 해결됩니다. – Reddy

+0

'ConcurrentQueue'로 잠근다 고요? 나는 거기에 더 좋은 방법이 있다고 믿는다. – xalz

답변

3

문제 진술 문은 producer-consumer처럼 보이며 한 명의 소비자 만 원하는 문제가 있습니다.

이러한 기능을 수동으로 다시 구현할 필요는 없습니다. 대신 BlockingCollection을 사용하는 것이 좋습니다. 내부적으로는 ConcurrentQueue을 사용하고 소비를 위해 별도의 스레드를 사용합니다. 이것은 사용 사례에 적합 할 수도 있고 그렇지 않을 수도 있습니다.

같은 뭔가 :

_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection 
_consumingThread = new Thread(() => 
{ 
    foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added. 
    { 
     // do work with workItem 
    } 
}); 
_consumingThread.Start(); 

여러 생산자 (작업 또는 스레드)가 _blockingCollection에 아무 문제가 작업 항목을 추가 할 수 있습니다 및 생산자/소비자를 동기화에 대해 걱정할 필요가 없습니다.

작업 완료시 _blockingCollection.CompleteAdding()으로 전화하십시오 (이 방법은 스레드로부터 안전하지 않으므로 모든 제작자를 미리 중지하는 것이 좋습니다). 아마, 소비 스레드를 종료하기 위해 _consumingThread.Join() 어딘가에해야합니다.

0

Microsoft의 Reactive Framework 팀의 Reactive Extensions (NuGet "System.Reactive")를 사용합니다. 그것은 사랑스러운 추상화입니다.

public class ConcurrentLoop 
{ 
    private static Subject<MyJob> _jobs = new Subject<MyJob>(); 

    private static IDisposable _subscription = 
     _jobs 
      .Synchronize() 
      .ObserveOn(Scheduler.Default) 
      .Subscribe(job => 
      { 
       //Do something 
      }); 

    public static void QueueJob(MyJob job) 
    { 
     _jobs.OnNext(job); 
    } 
} 

이 잘 단일 스트림으로 들어오는 모든 작업을 동기화하고 (기본적으로 스레드 풀입니다) Scheduler.Default에의 실행을 밀어하지만 모든 입력을 직렬화 때문에 단 한 번에 발생할 수 있습니다. 이것에 대한 좋은 점은 값 사이에 큰 차이가있을 경우 스레드를 해제한다는 것입니다. 그것은 매우 희박한 해결책입니다.

청소하려면 _jobs.OnCompleted(); 또는 _subscription.Dispose();을 호출하면됩니다.