2012-05-14 5 views
3

많은 프로젝트/파일을 컴파일해야하는 응용 프로그램을 테스트하고 있습니다.Parallel.ForEach 끝에 스레드가 없습니다

Parallel을 통해 처리해야하는 ConucrrentBag가 있습니다.

Parallel.ForEach(m_files, new ParallelOptions 
      { 
       MaxDegreeOfParallelism = MaxProcesses, 

      }, currFile => ProcessSingle(currFile.ToString())); 

MaxProcess의 양이 LogicalCpu * 2 : 병렬에 대한

private readonly ConcurrentBag<string> m_files; 

내 전화는 이것이다.

140 개의 프로젝트를 컴파일 할 때 Parallel은 선형 적은 스레드를 시작합니다. 최소한 마지막 4 개 프로젝트에 대해 하나의 스레드 만 실행 중입니다. 좋지는 않지만 괜찮아. 이제

내 문제 :

나는 14000+ 프로젝트 (그것의 COBOL 소스 ;-)과 정말 큰 시스템) 마지막 모듈이 컴파일되지 않습니다에 대해 컴파일하고있어

때문에 Parallel.ForEach 하는게 아닙니다 ' 이것에 대한 새로운 쓰레드가 시작되었습니다. 이 시점에서 워킹 쓰레드는 살아 있지 않습니다. 그러나 concurrentBag에는 여전히 140 개의 항목이 있습니다.

아무도 아이디어를 어떻게 해결할 수 있습니까?

편집 : 해당 문제는 컴파일러를 실행할 때 발생합니다. ... 그것은의 벌금을 작동 (빠른 테스트) 컴파일러를 실행하지 않고

편집 : 나는 Parallel.ForEach 프로세스를 시작할 때

가 ConcurrentBag 이미 completly 가득합니다. 자세한 내용은

, SingleProcess의 코드 : 여기

private void ProcessSingle(string item) 
     { 
      Monitor.Enter(lockingObj); 
      if (m_files.TryTake(out item)) 
      { 
       if (CompilingModules <= 0) 
       { 
        OnQueueStarted(new EventArgs()); 
       } 
       CompilingModules++; 
       Monitor.Exit(lockingObj); 
       OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String)); 

       OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String)); 
       using (CobolCompiler compiler = new CobolCompiler()) 
       { 
        compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e); 
        compiler.Compile(item); 
        Thread.Sleep(2000); 
        if (compiler.LinkFailure) 
        { 
         if (ObjWithoutDll.ContainsKey(item)) 
         { 
          if (ObjWithoutDll[item] <= 2) 
          { 
           m_files.Add(item); 
           OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String)); 
           ObjWithoutDll[item]++; 
          } 
          else 
          { 
           OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String)); 
           ObjWithoutDll.Remove(item); 
          } 
         } 
         else 
         { 
          ObjWithoutDll.Add(item, 0); 
          m_files.Add(item); 
          OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String)); 
         } 
        } 
        else 
        { 
         if (compiler.DllExisting) 
         { 
          ObjWithoutDll.Remove(item); 
         } 
         OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String)); 
        } 

       } 

       Monitor.Enter(lockingObj); 
       CompiledModules++; 
       if (CompiledModules % 300 == 0) 
       { 
        Thread.Sleep(60000); 
       } 
       CompilingModules--; 
       if (CompilingModules <= 0 && m_files.Count <= 0) 
       { 

        try 
        { 
         Process prReschk = new Process(); 
         FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd"); 
         if (!batch.Exists) 
         { 
          Assembly _assembly = Assembly.GetExecutingAssembly(); 
          StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd")); 
         } 

         if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe")) 
         { 
          File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe"); 
         } 

         prReschk.StartInfo.FileName = @"cmd.exe"; 
         prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir; 
         prReschk.StartInfo.CreateNoWindow = true; 
         prReschk.StartInfo.UseShellExecute = false; 
         prReschk.Start(); 
         prReschk.Close(); 
         prReschk.Dispose(); 
        } 
        catch 
        { 
        } 

        OnQueueFinished(new EventArgs()); 
       } 
      } 
      Monitor.Exit(lockingObj); 
     } 

CobolCompiler 클래스의 Codesnippet :

공공 무효 컴파일 (문자열 파일) 만 {

 file = file.ToLower(); 

     Process prCompile = new Process(); 
     Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\"); 

     try 
     { 
      // First clean up the folder 
      CleanUpFolder(true, file); 

      // First set lock and copy all sources 
      Monitor.Enter(lockingObj); 
      if (filesToCopy == null) 
      { 
       CopySource(Dir.FullName); 
      } 
      Monitor.Exit(lockingObj); 

      FileInfo batch = new FileInfo(@"batches\compile.cmd"); 
      if (!batch.Exists) 
      { 
       Assembly _assembly = Assembly.GetExecutingAssembly(); 
       StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd")); 
       _textStreamReader.Dispose(); 
      } 

      prCompile.StartInfo.FileName = @"cmd.exe"; 
      prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\")); 
      prCompile.StartInfo.CreateNoWindow = true; 
      prCompile.StartInfo.UseShellExecute = false; 
      prCompile.StartInfo.RedirectStandardOutput = true; 
      prCompile.StartInfo.RedirectStandardError = true; 
      prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1); 
      prCompile.EnableRaisingEvents = true; 
      prCompile.OutputDataReceived += prCompile_OutputDataReceived; 
      prCompile.ErrorDataReceived += prCompile_OutputDataReceived; 
      prCompile.Start(); 
      prCompile.BeginErrorReadLine(); 
      prCompile.BeginOutputReadLine(); 
      prCompile.WaitForExit(); 
      prCompile.Close(); 
      prCompile.Dispose(); 

      CleanUpFolder(false, file); 

      if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe")) 
      { 
       dllExisting = true; 
       linkFailure = false; 
      } 
      else 
      { 
       if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj")) 
       { 
        linkFailure = true; 
       } 
       dllExisting = false; 
      } 



     } 
     catch (ThreadAbortException) 
     { 
      if (prCompile != null) 
      { 
       // On Error kill process 
       prCompile.Kill(); 
       prCompile.Dispose(); 
      } 
     } 
     catch (Win32Exception) 
     { 
     } 
     catch (Exception) 
     { 
      dllExisting = false; 
     } 

     while (true) 
     { 
      try 
      { 
       if (Directory.Exists(Dir.FullName)) 
       { 
        Directory.Delete(Dir.FullName, true); 
        break; 
       } 
       else 
       { 
        break; 
       } 
      } 
      catch 
      { 
      } 
     } 


    } 
private void CopySource(string Destination) 
{ 
    filesToCopy = new StringCollection(); 
    foreach (string strFile in Directory.GetFiles(c.WorkingDir)) 
    { 
     string tmpStrFile = strFile.ToLower(); 

     foreach (string Extension in c.Extensions) 
     { 
      if (tmpStrFile.Contains(Extension)) 
      { 
       filesToCopy.Add(tmpStrFile); 
      } 
     } 
    } 

    if (filesToCopy.Count > 0) 
    { 
     foreach (string strFile in filesToCopy) 
     { 
      File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\"))); 
     } 
    } 
} 

private void CleanUpFolder(bool PreCleanup, string Filename) 
{ 
    //Copy all files from compilationfolder to working directory 
    if (!PreCleanup) 
    { 
     foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*")) 
     { 
      FileInfo fileToMove = new FileInfo(strFile); 

      if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf(".")))) 
      { 
       File.Copy(strFile, c.WorkingDir + fileToMove.Name, true); 
      } 
     } 
    } 

    //Delete useless files 
    foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*")) 
    { 
     bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1)); 
     if (PreCleanup) 
     { 
      // Only delete files, which are not won't be compiled 
      if(!foundExt) 
      { 
       File.Delete(filename); 
      } 
     } 
     else 
     { 
      if (!Config.Instance.SaveLspFile && filename.Contains(".lsp")) 
      { 
       File.Delete(filename); 
      } 

      if (!Config.Instance.SaveLstFile && filename.Contains(".lst")) 
      { 
       File.Delete(filename); 
      } 
     } 
    } 
} 

public void Dispose() 
{ 
    Dispose(true); 
    GC.SuppressFinalize(this); 
} 

protected virtual void Dispose(bool disposing) 
{ 
    if (!disposed) 
    { 
     if (disposing) 
     { 
      Dir = null; 
     } 
     disposed = true; 
    } 
} 

~CobolCompiler() 
{ 
    Dispose (false); 
} 

내가 모든 컴파일 과정 후 2 초 동안 자고 시험해 보았다. 그러나 이것은 아무 것도 바뀌지 않습니다.

컴파일이 진행되는 동안 CPU는 100 %입니다. 응용 프로그램에서 270MB RAM을 수집 중입니다. 시작시 35MB 밖에되지 않습니다.

affraid하지 마십시오. 컴파일러가 동일한 작업 디렉토리에서 동시에 여러 파일을 컴파일 할 수 없기 때문에 임시 폴더에 모든 소스를 복사해야합니다.

편집 : 이미 스레드가 없지만 항목이 남아있는 문제를 해결했습니다.

ProcessSingle에서 dll에 연결되지 않았을 때 다시 컴파일하려고했던 항목을 추가합니다.

그래서 14000 개의 항목으로 시작하여 Parallel.ForEach를 처리하는 동안이 concurrentBag에 항목을 다시 추가했습니다 (연결에 실패한 경우). 그래서 ForEach를 14000 회 실행하고 xxx 모듈을 다시 컴파일해야합니다.:-(

나는 WaitForExit없이 prReschk의 실행이 때문에 이상 14000 개 항목에 대해 능숙를 확인하는. 것입니다. 그 볼 시간이 오래 걸립니다 새로운 컴파일을 방해하지 말아야하지 않았다.

하지만를 ConcurrentBag의 끝 부분에 쓰레드가 적을 때의 문제는 여전히 존재합니다 : (그러나주기가 많은 경우에만주의 사항입니다.)

+0

'ProcessSingle()'이 처리하지 않는 예외를 던지고 전체 실행을 중단합니까? –

+0

Parallel.ForEach() 전에 ConcurrentBag가 완전히 채워 졌습니까? 그렇다면 보통의'List <> '. 그렇다면 이것이 잘못된 방법으로 처리 할 수 ​​있습니다. –

+0

ProcessSingle()을 시작하기 전에 예외가 없습니다 ... – Demigod

답변

2

Parallel.ForEach 메소드는 .Net ThreadPool을 사용하여 스레드를 할당합니다. 병렬로 실행될 쓰레드의 수는 시스템 CPU의 부하에 따라 ThreadPool에 의해 결정됩니다. 따라서 MaxDegreeOfParallelism을 지정했을 수 있습니다 만 이것이 최대 값 일 때 T hreadPool은이 최대 값보다 적은 수의 스레드를 할당하기로 결정할 수 있습니다.

귀하의 질문에 귀하가 제시 한 증거에 따르면 컴파일 과정에서 시스템 자원을 사용하고 나중에 정리하지 않는 것처럼 들립니다. 이것은 140 개의 컴파일이 할당되는 스레드의 점진적인 감소로 끝나는 이유를 설명 할 것입니다. ThreadPool은 CPU가 무겁게로드되었다고 생각하기 때문에 새로운 스레드를 할당하지 않습니다.

컴파일 프로세스가 어떻게 종료되는지 자세히 살펴 보겠습니다. 컴파일이 완전히 완료되기 전에 ProcessSingle 메서드가 반환됩니까? 컴파일 프로세스에서 메모리 누수가 있습니까? ThreadPool이 다시 제어를 전달하기 전에

System.Threading.Thread.Sleep(2000); 

이 2 초 동안 스레드를 일시 중지됩니다 : 당신이 ProcessSingle이 호출 한 후 다음 행을 추가 한 경우이 다르게 행동하면

실험으로

, 내가 알고 관심을 가질 것 다음 작업을 할당합니다. 응용 프로그램의 동작을 향상 시키면 내 이론이 옳다고 강력하게 제안됩니다.

+0

나는 시도했다 .. 변경 없음 :(처음에는 모두 괜찮습니다 ...하지만 결국 스레드 수를 최소화합니다 ... 제가 14000+에 대해 컴파일 할 때 스레드는 마지막 160 개에 남았습니다. 140 번 남은 후 쓰레드:/그리고 140 개의 모듈을 컴파일 할 때 마지막 6 개 모듈의 스레드를 제거합니다 ... – Demigod

+0

Demigod, 나머지 코드를 게시 해 주셔서 감사합니다. 내게 사용할 수있는 COBOL 컴파일 환경이 없으므로 코드를 테스트 할 수는 없지만이 코드를 읽으면 동작을 설명 할 수있는 두 가지 문제점을 발견했습니다. – Kev

+0

1. prReschk.Start(); - WaitForExit 호출이 뒤따라야합니다. 다른 모든 Process를 재확인 할 것입니다. 시작하라는 메시지가 나오기를 기다리십시오. 2. ConcurrentBag의 내용을 읽기위한 혼란스런 메커니즘이있는 것 같습니다. Parallel.ForEach 메서드는 컬렉션을 반복하고 각 멤버를 람다 식으로 전달합니다. 하지만 ProcessSingle에서 'm_files.TryTake (out item)'을 사용하여 ConcurrentBag의 다른 멤버를 가져 와서 시작하십시오. 이 TryTake를 제거하십시오, 이것은 거의 확실하게 당신에게 어떤 문제를 일으킬 것입니다, 그것은 당신이 가지고있는 문제를 설명 할 수 있습니다. – Kev

0

CopySource이 던졌습니다. 출시되지 않은 잠금 lockingObj이 있으며 더 이상 진행할 수 없습니다. finally 블록을 사용하여 잠금을 해제하는 lock (lockingObj)을 사용하십시오.