2016-11-21 3 views
3

데이터베이스에 광범위한 데이터 삽입을 수행해야합니다. 동시 작업 수를 제한하는 스로틀 링 된 스케줄러로 코드를 멀티 스레드 방식으로 구현할 수 있습니다. 모든 M 행에 블록이 형성되어 원자 조작으로 데이터베이스에 삽입됩니다. 데이터베이스가 데이터 파일을 읽고 구문 분석하는 것보다 속도가 느리기 때문에 여러 동시 작업이 발생합니다. 필자는 종종 멀티 스레딩을 사용하여이 모델을 구현합니다. 동시 System.Threading.Tasks.Task의 수를 제한하십시오.

내가/await를 비동기 사용하여 내 코드를 구현하기로 대신하는 경우

I 동시에 이하 N 동시 작업가 (즉, 데이터베이스로 이동)을 실행하는지 확인하는 방법을 (엔티티 프레임 워크는 비동기 프로그래밍 지원)?

필자의 초기 디자인에서는 원자 적으로 삽입 될 데이터 블록을 읽는 즉시 List<Task>을 인스턴스화하고 새로운 작업을 추가 한 다음 await 모든 작업을 완료 한 후 내 메서드를 반환하도록했습니다. 디자인 타임 문제는 큰 데이터 파일에 대해 완료하는 것보다 빠르게 작업을 수행하기 때문에 동시에 발생하는 숫자가 Task (및 메모리 풋 프린트)이 폭발한다는 것입니다.

SemaphoreSlim을 사용하려고 생각했지만 비동기 프로그래밍 (멀티 스레드와 달리)에 대한 경험이 거의 없습니다. 그래서이 질문에 모범 사례에 대한 피드백을 얻으려고합니다.

+0

내가 확실히는'SemaphoreSlim'이 경우에 갈 수있는 방법이라고 생각을 , 제한된 수의 다중 스레드를 실행하려는 경우. – Gertsen

+0

작업이 IO 바인딩이므로 다중 스레드가 필요하지 않습니다. 추가 스레드없이 여러 개의 병렬 DB 쿼리를 실행할 수 있습니다. – Servy

+1

[여기에는 제한된 동시성 작업 스케줄러가 있습니다.] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx?f=255&MSPPError=- 2147217396) (구현을 위해 조금 아래로 스크롤하십시오). –

답변

1

디자인 타임 문제는 (따라서 메모리 풋 프린트) 동시 작업의 수는 작업 때문에 폭발 할 예정이다 큰 데이터 파일에 대해 완료하는 것보다 빠르게 공급됩니다. 대신 내를 구현하기로 결정하는 경우

... 그러나

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10); 

async Task ThrottledWorkAsync() 
{ 
    await _semaphore.WaitAsync(); 
    try 
    { 
    await WorkAsync(); 
    } 
    finally 
    { 
    _semaphore.Release(); 
    } 
} 

: 나는 SemaphoreSlim 적절한 동시 비동기 작업을 조절하기위한 선택을하는 SemaphoreSlim

예를 사용하는 방법에 대한 생각 await/async (Entity Framework가 비동기 프로그래밍을 지원함)를 사용하여 코드를 작성하면 동시에 N 개 이상의 동시 작업 (예 : 데이터베이스로 이동)이 실행되지 않도록 할 수 있습니까?

주의해야 할 점은 Entity Framework는 비동기 API를 지원하지만 요청 당 하나의 연결이 필요하다는 것입니다. 따라서 동일한 DbContext으로 여러 개의 동시 비동기 요청을 가질 수는 없습니다. 각각의 동시 요청 (또는 동시 요청에 의해 "차용"된 N 개 이상의 연결)에 대해 별도의 연결을 만들어야합니다.

+0

코드에 문제가있을 수 있습니다. 나는 작업자가 여전히 움직이게되면'StreamReader'를 소비하지 않도록 작업을 시작하기 전에 기다렸습니다. (Dennis의 대답을보십시오) –

+0

@ usr-local-ΕΨΗΕΛΩΝ :'StreamReader'가 무슨 말을하고 있는지 전혀 모르겠습니다. 내 코드는 세마포어 슬롯이 취해질 때까지'WorkAsync'를 시작하지 않을 것입니다. –

+0

필자는 애플리케이션에 피드를 제공하는 데이터 소스를 의미했는데, 제 경우에는이 질문에서 언급하지 않은 'StreamReader'입니다. 음, 네,'WorkAsync' 코드는 실제로 슬롯이 사용 가능할 때까지 시작되지 않지만, 비동기 태스크 ThrottledWorkAsync'를 비동기 wait-perform-release 코드를 대기열에 넣는 for 루프에 랩핑하면 IMO가됩니다. 슬롯을 기다리는 작업으로 인해 TaskExecutor가 막힐 위험이 있습니다. 이 경우 10 개의 작업 만 동시에 실행되지만 더 많은 작업 (메모리 발자국과 함께)이 할당됩니다. 작은 코드로 내 질문 (내일)을 편집해야합니다. –

0

나는 나의 스레드를 실행하기 위해이 코드 조각을 사용 How to limit the amount of concurrent async I/O operations?

그것의 모양은 호출 :

public static async Task WhenAll(this List<Func<Task>> actions, int threadCount) 
{ 
    var _countdownEvent = new CountdownEvent(actions.Count); 
    var _throttler = new SemaphoreSlim(threadCount); 

    foreach (Func<Task> action in actions) 
    { 
     await _throttler.WaitAsync(); 

     Task.Run(async() => 
     { 
      try 
      { 
       await action(); 
      } 
      finally 
      { 
       _throttler.Release(); 
       _countdownEvent.Signal(); 
      } 
     }); 
    } 

    _countdownEvent.Wait(); 
} 

이 코드는이 스레드에서 제공하는 코드 기반으로합니다. 이 모든 작업을 수행하지만, 40 동시 (이 경우) :

var tasks = new List<Func<Task>>(); 
tasks.Add(() => saveAsync()); 
//add more 
await tasks.WhenAll(40); 
+3

여기에'Task.Run'을 사용할 필요가 없습니다. 비동기 작업이 이미 있습니다. 다른 스레드에서 시작해서는 안됩니다. 또한 동기화가 완료 될 때까지 동 기적으로 기다리고 있으므로 비동기 작업임을 고려하면 안됩니다. 비동기식으로 모두 끝내기를 기다려야합니다. – Servy

1

당신이 경우 적어도 n 값 (n 동시 작업의 최대 크기 인) 처음에 삽입하려면 다음과 같은 접근 방식을 취할 수

  1. 호출 InsertAsync()n 시간을 다른 값으로.
  2. 각 작업이 완료되면 InsertAsync() (반복 2)으로 새 전화를 계속하십시오.

그런 식으로 세마포어를 사용하여 동시성 수준을 제어 할 필요가 없으며 비 블로킹이됩니다.

이 시나리오에 유용 할 수 있습니다, 그것은이 방법 Times()Map() 노출 I've just published a package : 예를 들어 https://github.com/jorgebay/concurrent-utils

:

// Execute MyMethodAsync() 1,000,000 times limiting the maximum amount 
// of parallel async operations to 512 
await ConcurrentUtils.Times(1000000, 512, (index) => MyMethodAsync(index)); 
+0

'Map'은 알 수없는 크기의'IEnumerable'으로 작동합니까? 삽입하기 전에 메모리에 모든 레코드를 저장할 필요가 없습니다. 10M 이상이 될 수도 있습니다. –

+0

나는 당신의 접근 방식을 여전히 좋아하지만 답을 읽는 것에서 비 차단 부분이 걱정됩니다.메모리에서 10M 이상의 레코드를 읽으려고하면 'OutOfMemoryException'을 피할 수 없으므로 입력 스트림의 읽기를 차단해야합니다. 영리한 해결책은 내가 평가하는'MemoryFailPoint'를 사용하는 것일 수 있습니다. –

+0

불특정 한 양의 데이터 (백그라운드에서 수시로 검색 됨)로 작업하려면 작업 대기열'ConcurrentUtils.CreateQueue)'. – jorgebg

관련 문제