1

푸른 테이블의 데이터 - 약 5k 개의 테이블을 읽고 다른 메트릭을 수집하고이를 다른 비어있는 테이블에 저장합니다. 비동기 방식으로 모든 것을 저장합니다. 내가 직면하고있는 문제는 때로는 일어날 수있는 거대한 데이터가있을 때 응용 프로그램이 멈추는 것입니다. 동일한 코드가 적은 데이터로도 잘 작동합니다. 내가 뭐하는 거지 단계는 (그들 모두 수신이 비동기 사용하여 비동기 및 기다리고 있습니다)데이터가 적을 때만 비동기

  1. 1 & 2 병렬에있는 모든 테이블을 이전 메트릭 데이터를 (읽기 푸른
  2. 에서 모든 테이블 이름을 읽기 - Task.WhenAll)
  3. 각 테이블 프로세스에서 데이터를 가져 오기 및 저장 (Task.WhenAll)
  4. 무엇인지 내가 원하는

, 그것까지 사용 asynchronousy 내 응용 프로그램 매달려하지 않습니다. 처리 할 수있는 것보다 많은 데이터가있는 경우, 더 이상 사용할 수있는 데이터 처리를 완료하는 데 더 이상 테이블의 데이터를 읽지 않아야합니다.

Parallel.ForEach이 처리합니까?

코드 : Stephen Cleary으로 편집되었습니다. 아직 모든 테이블에서 작동하지 않습니다. 반면 그것은 500 테이블을 위해 일하고있다.

나는 스레드의 수보다는 정지 상태로 애플 리케이션 (콘솔 어플리케이션)을 가져 오는 데이터의 양이라고 생각한다. (하나의 쓰레드는 백만 개의 행을 검색 할 수 있으며, 수 천 개가 넘고 메서드가 넘겨지면 그 수는 사전에 추가되므로 더 많은 메모리가 필요하면 가비지 수집이 가능하다.) 또는 구현 한 방법이다 Semaphoreslim 그게 잘못된거야? 당신의 async 이후

public async Task CalculateMetricsForAllTablesAsync() 
{ 
    var allWizardTableNamesTask = GetAllWizardTableNamesAsync(); 
    var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync(); 

    await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false); 

    var allWizardTableNames = allWizardTableNamesTask.Result; 
    var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result; 

    var throttler = new SemaphoreSlim(10); 
    var concurrentTableProcessingTasks = new ConcurrentStack<Task>(); 

    foreach (var tname in allWizardTableNames) 
    { 
     await throttler.WaitAsync(); 
     try 
     { 
      concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname))); 
     } 
     finally 
     { 
      throttler.Release(); 
     } 
    } 

    await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false); 

} 

private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable) 
{ 
    var tableDataRetrieved = new TaskCompletionSource<bool>(); 
    var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>(); 

    _fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable)) 
     .Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities),() => tableDataRetrieved.TrySetResult(true)); 

    await tableDataRetrieved.Task; 
    await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false); 
} 
+0

코드를 제시해야합니다. – Enigmativity

+0

또한 Rx 또는 TPL/async/await를 사용해야합니다. 둘 중 하나만 가능합니다. 교착 상태가 발생하는 것은 매우 쉽습니다. 개인적으로 나는 당신이하고있는 일에 대해 Rx와 함께 갈 것입니다. – Enigmativity

+0

내가 교착 상태라고 생각하지 않는다. 디버깅을하고 중단 점에서 멈추면 다른 스레드가 작업을하고 더 많은 스레드를 처리하지 않아 메모리가 덜 부담된다. – Saravanan

답변

3

내가 async 수준에서 조절 권하고 싶습니다, 수신을 포장한다. SemaphoreSlim을 정의하고 메소드 논리를 WaitAsync/Release 내에 랩핑하여이를 수행 할 수 있습니다.

또는 TPL 데이터 흐름을 고려하십시오. Dataflow에는 스로틀 링을위한 옵션 (MaxDegreeOfParallelism)이 내장되어 있으며 async 및 Rx와 자연스럽게 상호 운용됩니다.

+0

제어 스레드 수가 데이터 양을 해결합니까? 소수의 스레드로도 여러 테이블에서 더 많은 데이터를 가져올 수 있습니까? – Saravanan

+2

스로틀은 처리를 수행하는 작업의 수를 제한합니다. 업데이트 된 코드가 스로틀 링을 올바르게 적용하지 않습니다 (단지'ProcessTableDataAsync'의 * start *를 조절하고 스택에 작업을 추가하는 것입니다). 대신'try' 내부에'ProcessTableDataAsync'에 모든 코드를 넣어야합니다. –

+0

은'Semaphoreslim'으로 조절을 제대로 할 수 없었습니다. 'TPL Dataflow'로 갔다.비동기 코드를'TPL Dataflow'로 작성하는 것은 쉬워졌습니다. 제한된 수의 테이블로 볼 수있는 퍼포먼스 향상을보고 있습니다. 모든 테이블을 테스트하려고합니다. – Saravanan

관련 문제