2016-07-12 11 views
0

항목 목록 (200k - 300k)을 처리 중이며 각 항목 처리 시간은 2 - 8 초입니다. 시간을 벌기 위해이 목록을 병렬로 처리 할 수 ​​있습니다. 내가 비동기 상황에있어, 나는 이런 식으로 뭔가를 사용C# 병렬 Foreach + Async

public async Task<List<Keyword>> DoWord(List<string> keyword) 
{ 
    ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>(); 
    if (keyword.Count > 0) 
    { 
     try 
     { 
      var tasks = keyword.Select(async kw => 
      { 
       return await Work(kw).ConfigureAwait(false); 
      }); 

      keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false)); 
     } 
     catch (AggregateException ae) 
     { 
      foreach (Exception innerEx in ae.InnerExceptions) 
      { 
       log.ErrorFormat("Core threads exception: {0}", innerEx); 
      } 
     } 
    } 
    return keywordResults.ToList(); 
} 

키워드 목록은 항상 8 개 요소 (위에서 오는) 따라서 나는이 경우, 제 생각 엔, 8 내 목록 (8)을 처리하지만 포함 7 개의 키워드가 3 초 내에 처리되고 8 번째 키워드가 10 초 내에 처리되는 경우, 8 개의 키워드에 대한 총 시간은 10 일 것입니다 (내가 틀렸다면 나를 수정하십시오). 어떻게 Parallel.Foreach에서 접근 할 수 있습니까? 내 말은 : 1 개가 실행되면 8 개의 키워드를 실행하고 1 개를 실행하십시오. 이 경우 8 가지 작업 프로세스가 영구적으로 유지됩니다. 어떤 생각?

+0

은 사용으로 간주 되세요 ['TPL DataFlow'] (https://msdn.microsoft.com/en-us /library/hh228603(v=vs.110).aspx) 항목을 처리하기 위해 파이프 라인을 설정 하시겠습니까? –

+0

이렇게 들리는군요. https://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.maxdegreeofparallelism (v=vs.110).aspx –

+0

@MatthewWatson, 방금 내가 존재한다는 것을 배웠고, 나는 이것, 고맙습니다! – Gun

답변

2

TPL Dataflow을 사용하여 어떻게 접근 할 수 있는지 보여주는 몇 가지 샘플 코드가 있습니다.

이것을 컴파일하려면 NuGet을 통해 프로젝트에 TPL 데이터 흐름을 추가해야합니다.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace Demo 
{ 
    class Keyword // Dummy test class. 
    { 
     public string Name; 
    } 

    class Program 
    { 
     static void Main() 
     { 
      // Dummy test data. 
      var keywords = Enumerable.Range(1, 100).Select(n => n.ToString()).ToList(); 

      var result = DoWork(keywords).Result; 

      Console.WriteLine("---------------------------------"); 

      foreach (var item in result) 
       Console.WriteLine(item.Name); 
     } 

     public static async Task<List<Keyword>> DoWork(List<string> keywords) 
     { 
      var input = new TransformBlock<string, Keyword> 
      (
       async s => await Work(s), 
       // This is where you specify the max number of threads to use. 
       new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 } 
      ); 

      var result = new List<Keyword>(); 

      var output = new ActionBlock<Keyword> 
      (
       item => result.Add(item), // Output only 1 item at a time, because 'result.Add()' is not threadsafe. 
       new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 } 
      ); 

      input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true }); 

      foreach (string s in keywords) 
       await input.SendAsync(s); 

      input.Complete(); 
      await output.Completion; 

      return result; 
     } 

     public static async Task<Keyword> Work(string s) // Stubbed test method. 
     { 
      Console.WriteLine("Processing " + s); 

      int delay; 
      lock (rng) { delay = rng.Next(10, 1000); } 
      await Task.Delay(delay); // Simulate load. 

      Console.WriteLine("Completed " + s); 
      return await Task.Run(() => new Keyword { Name = s }); 
     } 

     static Random rng = new Random(); 
    } 
} 
1

이 작업을 수행하는 또 다른 더 쉬운 방법은 사용하는 것입니다 AsyncEnumerator NuGet Package :

using System.Collections.Async; 

public async Task<List<Keyword>> DoWord(List<string> keywords) 
{ 
    var keywordResults = new ConcurrentBag<Keyword>(); 
    await keywords.ParallelForEachAsync(async keyword => 
    { 
     try 
     { 
      var result = await Work(keyword); 
      keywordResults.Add(result); 
     } 
     catch (AggregateException ae) 
     { 
      foreach (Exception innerEx in ae.InnerExceptions) 
      { 
       log.ErrorFormat("Core threads exception: {0}", innerEx); 
      } 
     } 
    }, maxDegreeOfParallelism: 8); 
    return keywordResults.ToList(); 
}