2014-11-17 2 views
1

TPL을 사용하여 속도를 높이기 위해 일부 프로세스 집중 루핑을 다시 작성했습니다. 스레딩을 시도한 것은 이번이 처음이므로, 내가하고있는 일을 확인하고 싶은 것은 올바른 방법입니다.C# Parallel.foreach - 변수를 스레드로 안전하게 설정하기

결과 좋다 - Parallel.ForEach 루프 표준 foreach 루프에서 이동하는 경우 DataTable 1000 행의 데이터를 처리 구분 34 분의 처리 시간을 감소시켰다. 이 테스트에서는 로그 파일에 데이터를 쓰고 카운터를 증가시키는 것과 같은 스레드가 아닌 안전한 작업을 제거했습니다.

다시 로그 파일에 기록하고 카운터를 증가시켜야하므로 스트림러/증분 코드 블록을 포함하는 잠금을 구현하려고했습니다.

FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create); 
StreamWriter streamwriter = new StreamWriter(filestream); 
streamwriter.AutoFlush = true; 

try 
{ 
    object locker = new object(); 

    // Lets assume we have a DataTable containing 1000 rows of data. 
    DataTable datatable_results; 

    if (datatable_results.Rows.Count > 0) 
    { 
     int row_counter = 0; 

     Parallel.ForEach(datatable_results.AsEnumerable(), data_row => 
     { 
      // Process data_row as normal. 

      // When ready to write to log, do so. 
      lock (locker) 
      { 
       row_counter++; 
       streamwriter.WriteLine("Processing row: {0}", row_counter); 

       // Write any data we want to log. 
      } 
     });      
    } 
} 
catch (Exception e) 
{ 
    // Catch the exception. 
} 
streamwriter.Close(); 

위의 내용은 최소한의 성능 비용 (여전히 9 분의 실행 시간)으로 예상대로 작동하는 것 같습니다. 물론, 자물쇠에 포함 된 동작은 그다지 중요하지 않습니다. 자물쇠 내의 코드를 처리하는 데 걸리는 시간이 길어질수록 스레드가 잠길수록 처리 시간에 영향을 미칩니다.

내 질문 : 위의 작업을 수행하는 효율적인 방법입니까, 아니면 더 빠르고 안전한 다른 위의 다른 방법이 있습니까?

원래 DataTable에는 실제로 30000 개의 행이 포함되어 있습니다. 이 DataTable을 1000 행 단위로 나누어 처리 한 다음 한 번에 모든 300000 행을 처리하는 대신 Parallel.ForEach에서 처리해야 할 것이 있습니까?

+2

이 질문은 [Code Review] (http://codereview.stackexchange.com) –

+0

에 속하므로 위의 코드는 컴파일되거나 실행되지 않습니다. 직접 추출물이 아닙니다. 필자는 멀티 스레딩과 변수를 스레드로부터 안전하게하는 이론에 더 관심이 있습니다. 그래도 고마워. – dalemac

답변

5

파일에 쓰기가 비싸고 파일에 쓰는 동안 독점적 인 잠금을 사용하고 있습니다. 그것은 논쟁을 일으킬 것입니다.

버퍼에 추가 한 다음 한꺼번에 파일에 쓸 수 있습니다. 그것은 논쟁을 없애고 규모를 키워야합니다. 당신은 루프 카운터를 유지 할 필요가 없습니다

if (datatable_results.Rows.Count > 0) 
{ 
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>(); 
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) => 
    { 
     // Process data_row as normal. 

     // When ready to write to log, do so. 

     buffer.Enqueue(string.Format("Processing row: {0}", index)); 
    }); 

    streamwriter.AutoFlush = false; 
    string line; 
    while (buffer.TryDequeue(out line)) 
    { 
     streamwriter.WriteLine(line); 
    } 
    streamwriter.Flush();//Flush once when needed 
} 
  1. 참고 Parallel.ForEach는 당신에게 하나를 제공합니다. 차이점은 카운터가 아니라 색인 인 입니다. 내가 예상 한 동작을 변경했다면 은 여전히 ​​카운터를 다시 추가하고 Interlocked.Increment을 까지 증가시킬 수 있습니다.
  2. 나는 당신이 streamwriter.AutoFlush = true을 사용하고있는 것을보고, 성능을 해칠 것입니다. false으로 설정하고 모든 데이터를 쓰고 나면 플러시하십시오.

StreamWriter을 using 문으로 마무리하면 스트림을 플러시 할 필요가 없습니다 (무료로 가져올 수 있음).

또는 작업을 잘 수행하는 로깅 프레임 워크를 살펴볼 수도 있습니다. 예 : NLog, Log4net 등

+0

하지만 로깅은 ** 정말 이상합니다. 당신이가는 동안 당신은 기록하지 않습니다, 당신은 마지막에 기록합니다. 따라서 무언가가 잘못되면 로그에서 어디로 갔는지에 대한 정보를 얻지 못합니다. – RobH

+0

@RobH 내가 무슨 뜻인지 모르겠다. 걱정할 필요가 없다면 로깅 부분을 마지막으로 차단하지 않겠습니까? –

+0

현재 작업이 진행되는 동안 로그를보고 뭔가하는 것을 볼 수 있습니다. 이제 로그가 끝날 때까지 기다렸다가 한꺼번에 로그를 읽어야합니다. 또한 확장 불가능한 메모리의 모든 로깅 정보를 보유해야합니다. – RobH

1

당신은 특정 로그 파일을 스레드로 로깅을 피하거나 로그인하는 경우,이 문제를 개선하려고 할 수 있습니다 당신은 많은 코어를 (확실하지가 당신에게 의미가있는 경우) 많은 스레드로

TPL 시작 Does Parallel.ForEach limits the number of active threads?.

그래서 당신이 할 수있는 것입니다 :

1) 많은 코어 내부에 많은 요소, 대상 시스템

2) 카운터의 목록을 만들기에 코어의 숫자를 얻기는

이 3) 모든 코어에 대한 카운터 업데이트

4) 병렬 실행이 종료 된 후 모두 합합니다.

그래서, 실제로 :

//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER) 
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES); 
.... 
Parallel.ForEach(datatable_results.AsEnumerable(), data_row => 
     { 
      // Process data_row as normal. 

      // When ready to write to log, do so. 
      //lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER 
      //{ 
       //row_counter++; 

       counters[Thread.CurrentThread.ManagedThreadId].Value +=1; 

       //NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY 
       //streamwriter.WriteLine("Processing row: {0}", row_counter); 


      //} 
     });    
.... 

//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS. 

이 drammatically 성능을 향상 모든에서 envolved이 그 없는 잠금,의 혜택을 누릴 수 있습니다. .Net concurent 모음을 사용할 때, 그들은 항상 내부에서 어떤 종류의 잠금을 사용합니다.

자연스럽게 기본 아이디어입니다. 붙여 넣기를 복사하면 예상대로 작동하지 않을 수 있습니다. 우리는 항상 어려운 주제 인 멀티 스레딩에 대해 이야기합니다. 그러나, 잘하면, 그것은 당신에게 중계 할 몇 가지 아이디어를 제공합니다.

+0

아아, 스레딩에서 처음 시도한 것처럼 스레드 별 로그 파일은 나에게 의미가 없습니다. 정교하게 생각해? – dalemac

+1

** ** ** 실시간으로 어떤 것을 기록해야한다면 다른 스레드에서 같은 파일에 쓰지 않아도 동기화 문제를 피할 수 있습니다. 내가 제안한 것은 실시간 로깅을 피하는 것이고, 결과에 대한 최종 덤프 파일을 작성하고 계산을 수행하는 것입니다. 런타임시 로그 데이터로 문자열을 어셈블 할 수 있지만 병렬 루프 종료 후에 만 ​​해당 문자열을 파일에 기록 할 수 있습니다. 계산이 오랜 대기 시간을 포함하지 않는다면 이것은 자연스러운 일입니다. – Tigran

0

이것은 병렬 코드를 사용하는 코드입니다. 개념은 비슷하고 구현하기가 더 쉽습니다.참고로, 디버깅을 위해 일반 for 루프를 코드에 유지하고 병렬 코드를 조건부로 컴파일합니다. 희망이 도움이됩니다. 이 시나리오에서 i 값은 처리되는 레코드 수와 동일하지 않습니다. 카운터를 생성하고 잠금을 사용하여 값을 추가 할 수 있습니다. 카운터가있는 다른 코드의 경우 잠금을 사용하지 않고 더 느린 코드를 피하기 위해 잠시 값을 허용했습니다. 처리 된 레코드의 수를 나타내는 상태 메커니즘이 있습니다. 내 구현의 경우 카운트가 문제가되지 않는 약간의 기회가 있습니다. 루프가 끝날 때 모든 레코드가 처리되었다는 메시지가 나타납니다.

#if DEBUG 
     for (int i = 0; i < stend.PBBIBuckets.Count; i++) 
     { 
      //int serverIndex = 0; 
#else 
     ParallelOptions options = new ParallelOptions(); 
     options.MaxDegreeOfParallelism = m_maxThreads; 

     Parallel.For(0, stend.PBBIBuckets.Count, options, (i) => 

     { 
#endif 
      g1client.Message request; 
      DataTable requestTable; 

      request = new g1client.Message(); 

      requestTable = request.GetDataTable(); 

      requestTable.Columns.AddRange(
       Locations.Columns.Cast<DataColumn>().Select(x => new DataColumn(x.ColumnName, x.DataType)).ToArray 
        ()); 

      FillPBBIRequestTables(requestTable, request, stend.PBBIBuckets[i], stend.BucketLen[i], stend.Hierarchies); 
#if DEBUG 
     } 
#else 
     }); 
#endif 
0

새로운 방법으로 병렬 코드를 전송할 수 있습니다. 예를 들어 모든

// Class scope 
    private string GetLogRecord(int rowCounter, DataRow row) 
    { 
     return string.Format("Processing row: {0}", rowCounter); // Write any data we want to log. 
    } 

    //.... 
    Parallel.ForEach(datatable_results.AsEnumerable(), data_row => 
    { 
     // Process data_row as normal. 

     // When ready to write to log, do so. 
     lock (locker) 
      row_counter++; 

     var logRecord = GetLogRecord(row_counter, data_row); 

     lock (locker) 
      streamwriter.WriteLine(logRecord); 
    }); 
1

첫째, 카운터를 증가 및 로그 파일에 작성하는 테이블에서 행과 아마 몇 밀리 초를 처리하는 데 약 2 초 정도 걸립니다. 실제 처리가 직렬화해야하는 부분의 1000 배 이상인 경우이 메서드는 별 문제가되지 않습니다.

또한 구현 한 방식은 완전히 견고합니다. 최적화 방법은 있지만 상황에 따라 구현할 가치는 없습니다.

증분에 대한 잠금을 피하는 한 가지 유용한 방법은 Interlocked.Increment을 사용하는 것입니다. x++보다 조금 느리지 만 lock {x++;}보다 훨씬 빠릅니다. 그러나 귀하의 경우에는 중요하지 않습니다.

파일 출력은 어쨌든 출력이 직렬화되므로 기껏해야 잠금 시간을 최소화 할 수 있습니다. 잠금을 입력하기 전에 모든 출력을 버퍼링 한 다음 잠금 내에서 쓰기 작업을 수행하면됩니다. I/O에서 불필요한 블로킹을 피하려면 비동기 쓰기를 원할 것입니다.

관련 문제