2011-01-12 4 views
14

작업이 완료되고 스레드 동기화가 완료 될 때까지 기다리는 주제입니다.Parallel.ForEach - 정상적인 취소

현재 Parallel.ForEach 내에 동봉 된 반복이 있습니다. 아래 예제에서 루프의 정상적인 종료 (.NET 4.0)를 처리하는 가장 좋은 방법에 대한 의견에 몇 가지 질문을 제기했습니다.

private void myFunction() 
    { 

     IList<string> iListOfItems = new List<string>(); 
     // populate iListOfItems 

     CancellationTokenSource cts = new CancellationTokenSource(); 

     ParallelOptions po = new ParallelOptions(); 
     po.MaxDegreeOfParallelism = 20; // max threads 
     po.CancellationToken = cts.Token; 

     try 
     { 
      var myWcfProxy = new myWcfClientSoapClient(); 

      if (Parallel.ForEach(iListOfItems, po, (item, loopsate) => 
      { 
       try 
       { 
        if (_requestedToStop) 
         loopsate.Stop(); 
        // long running blocking WS call, check before and after 
        var response = myWcfProxy.ProcessIntervalConfiguration(item); 
        if (_requestedToStop) 
         loopsate.Stop(); 

        // perform some local processing of the response object 
       } 
       catch (Exception ex) 
       { 
        // cannot continue game over. 
        if (myWcfProxy.State == CommunicationState.Faulted) 
        { 
         loopsate.Stop(); 
         throw; 
        } 
       } 

       // else carry on.. 
       // raise some events and other actions that could all risk an unhanded error. 

      } 
      ).IsCompleted) 
      { 
       RaiseAllItemsCompleteEvent(); 
      } 
     } 
     catch (Exception ex) 
     { 
      // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the 
      // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception? 
      // Do I need to call cts.Cancel here? 

      // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that? 

      // do i need to call cts.Dispose() ? 

      MessageBox.Show(Logging.FormatException(ex)); 
     } 
     finally 
     { 

      if (myWcfProxy != null) 
      { 
      // possible race condition with the for-each threads here unless we wait for them to terminate. 
       if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted) 
        myWcfProxy.Abort(); 

       myWcfProxy.Close(); 
      } 

      // possible race condition with the for-each threads here unless we wait for them to terminate. 
      _requestedToStop = false; 

     } 

    } 

모든 도움을 주시면 감사하겠습니다. MSDN 문서는 ManualResetEventSlim과 cancellationToken.WaitHandle 's에 대해 이야기합니다. 그러나 이것에 연결하는 방법을 모르는 경우 MSDN 예제를 이해하는 데 어려움을 겪고있는 것 같습니다.

답변

8

귀하의 질문에 대한 답변을 드릴 수있는 아래의 코드를 조롱했습니다. 기본 점은 Parallel.ForEach를 사용하여 포크/조인 병렬 처리를 수행하므로 병렬 작업 외부의 경합 조건에 대해 걱정할 필요가 없습니다 (호출 스레드는 작업이 완료되거나 성공적으로 완료 될 때까지 차단됩니다). LoopState 변수 (람다의 두 번째 인수)를 사용하여 루프 상태를 제어하기 만하면됩니다.

루프 반복이 처리되지 않은 예외를 던진 경우 전체 루프에서 끝에 잡힌 AggregateException을 발생시킵니다. 이 주제를 언급

다른 링크 : 통찰력에 대한

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using System.ServiceModel; 

namespace Temp 
{ 
    public class Class1 
    { 
     private class MockWcfProxy 
     { 
      internal object ProcessIntervalConfiguration(string item) 
      { 
       return new Object(); 
      } 

      public CommunicationState State { get; set; } 
     } 

     private void myFunction() 
     { 

      IList<string> iListOfItems = new List<string>(); 
      // populate iListOfItems 

      CancellationTokenSource cts = new CancellationTokenSource(); 

      ParallelOptions po = new ParallelOptions(); 
      po.MaxDegreeOfParallelism = 20; // max threads 
      po.CancellationToken = cts.Token; 

      try 
      { 
       var myWcfProxy = new MockWcfProxy(); 

       if (Parallel.ForEach(iListOfItems, po, (item, loopState) => 
        { 
         try 
         { 
          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // long running blocking WS call, check before and after 
          var response = myWcfProxy.ProcessIntervalConfiguration(item); 

          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // perform some local processing of the response object 
         } 
         catch (Exception ex) 
         { 
          // cannot continue game over. 
          if (myWcfProxy.State == CommunicationState.Faulted) 
          { 
           loopState.Stop(); 
           throw; 
          } 

          // FYI you are swallowing all other exceptions here... 
         } 

         // else carry on.. 
         // raise some events and other actions that could all risk an unhanded error. 
        } 
       ).IsCompleted) 
       { 
        RaiseAllItemsCompleteEvent(); 
       } 
      } 
      catch (AggregateException aggEx) 
      { 
       // This section will be entered if any of the loops threw an unhandled exception. 
       // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
       // to see those (if you want). 
      } 
      // Execution will not get to this point until all of the iterations have completed (or one 
      // has failed, and all that were running when that failure occurred complete). 
     } 

     private void RaiseAllItemsCompleteEvent() 
     { 
      // Everything completed... 
     } 
    } 
} 
+0

감사합니다. 나는 여기서 "다른 모든 예외를 삼키는"지점을 지적해야합니다. 웹 서비스 또는 클라이언트 측 WCF 예외를 기록하는 로깅 호출을 만들고 있습니다. 예외로 인해 무효화 된 WCF 프록시가 발생하지 않으면 루프가 계속 진행됩니다. 시간 초과 오류 또는 서버 측 오류 예외가 예상됩니다. 이 특정 기능에 대한 정보는 캐치의 기능을 완화해야합니다. 그러나 우리는 로그 파일을 검토 할 것이며 그러한 예외는 조사 될 것입니다. – Terry

+0

Parallel.ForEach에 대해 혼란 스러웠던 점은 풀의 모든 스레드가 완료 될 때까지 (예 : 캐시 된 캐시 또는 캐시되지 않은 캐시) 블로킹 호출이라고 가정했기 때문입니다. 그러나 예를 들어 중단 점에서 실행되는 것으로보고 된 스레드의 수는 catch (AggregateException aggEx) 블록은 VS 2010 스레드 뷰어에서 20 개의 스레드로보고합니다. 그래서 sysinternals가 나왔고 디버깅중인 vshost 실행 파일을 살펴 보았고 UI와 메시지 펌프를 포함한 22 개의 스레드도 보여주었습니다. – Terry

+0

또한 루프 내에서 발생한 이벤트는 함수가 실행되고 finally 블록이 실행 된 후 추가 예외가 발생합니다. – Terry

관련 문제