8

을 사용하여 한 번에 n 개의 항목 만 처리 Windows 서비스에서이 모든 일이 발생합니다.작업 병렬 라이브러리

나는 처리 대기중인 항목을 보유하고있는 Queue<T> (실제로는 ConcurrentQueue<T>)입니다. 그러나 한 번에 하나씩 만 처리하고 싶지는 않습니다. n 항목을 동시에 처리하고 싶습니다. 여기서 n은 구성 가능한 정수입니다.

작업 병렬 라이브러리를 사용하여 어떻게해야합니까?

TPL은 동시 처리를 위해 개발자를 대신하여 콜렉션을 분할하지만 그 기능이 필자인지 확신 할 수 없습니다. 나는 멀티 쓰레딩과 TPL에 익숙하지 않다.

답변

4

ConcurrentQueue<T> 대신 BlockingCollection<T>을 사용하면 임의의 수의 소비자 스레드를 시작하고 Take 메서드를 BlockingCollection으로 사용할 수 있습니다. 컬렉션이 비어있는 경우 Take 메서드는 항목 추가를 기다리는 호출자 스레드에서 자동으로 차단됩니다. 그렇지 않으면 스레드가 모든 큐 항목을 병렬로 사용합니다. 그러나 당신의 질문으로 TPL의 사용을 언급 했으니 에 BlockingCollection 체크 this 체크와 함께 사용할 때 약간의 문제가 있음이 밝혀졌습니다. 그래서 소비자 스레드 생성을 관리해야합니다. new Thread(/*consumer method*/) 또는 new Task() ...

+0

BlockingCollection은 큐의 목적을 무효화합니다. 반복되는 동안 블로킹 컬렉션에서 항목을 제거 할 수 없습니다. –

+2

그 대신 [GetConsumingEnumerable] (http://msdn.microsoft.com/en-us/library/dd287186.aspx)을 사용할 수 있습니다. 'foreach (Item item in _collection.GetConsumingEnumerable())'도 컬렉션이 비어있는 경우 항목 추가를 기다리고 차단합니다. –

4

여기에 TaskFactory에 대한 확장 방법을 만드는 것이 좋습니다.

public static class TaskFactoryExtension 
{ 
    public static Task StartNew(this TaskFactory target, Action action, int parallelism) 
    { 
     var tasks = new Task[parallelism]; 
     for (int i = 0; i < parallelism; i++) 
     { 
      tasks[i] = target.StartNew(action); 
     } 
     return target.StartNew(() => Task.WaitAll(tasks)); 
    } 
} 

그러면 호출 코드는 다음과 같이 보입니다.

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
var task = Task.Factory.StartNew(
() => 
    { 
    T item; 
    while (queue.TryDequeue(out item)) 
    { 
     ProcessItem(item); 
    } 
    }, n); 
task.Wait(); // Optionally wait for everything to finish. 

Parallel.ForEach을 사용한 또 다른 아이디어입니다. 이 접근법의 문제점은 병렬 처리 수준이 반드시 존중되지 않을 수도 있다는 것입니다. 귀하는 절대 금액이 아니라 허용되는 최대 금액만을 표시합니다.

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n }, 
    (item) => 
    { 
    ProcessItem(item);  
    }); 
1

또한 직접 ConcurrentQueue를 사용하는 대신 BlockingCollection를 사용하는 것이 좋습니다 것입니다. 생성자에서 maxConcurrent 동시에 처리 될 것입니다 얼마나 많은 요청을 정의하는

public class QueuingRequestProcessor 
{ 
    private BlockingCollection<MyRequestType> queue; 

    public void QueuingRequestProcessor(int maxConcurrent) 
    { 
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent); 

    Task[] consumers = new Task[maxConcurrent]; 

    for (int i = 0; i < maxConcurrent; i++) 
    { 
     consumers[i] = Task.Factory.StartNew(() => 
     { 
     // Will wait when queue is empty, until CompleteAdding() is called 
     foreach (var request in this.queue.GetConsumingEnumerable()) 
     { 
      Process(request); 
     } 
     }); 
    } 
    } 

    public void Add(MyRequest request) 
    { 
    this.queue.Add(request); 
    } 

    public void Stop() 
    { 
    this.queue.CompleteAdding(); 
    } 

    private void Process(MyRequestType request) 
    { 
    // Do your processing here 
    } 
} 

참고 :

다음은 예입니다.

관련 문제