2011-11-26 2 views
4

TPL을 C# (.NET 4.0)에서 사용하고 있습니다.LimitedConcurrencyLevelTaskScheduler를 사용하면 계속 작업이 중단됩니다.

웹 요청 생성을 쉽게하고 (연속 작업을 사용하여 비동기 적으로) 콘텐츠를 다운로드하는 맞춤 API를 만들었습니다. 그 부분은 잘 작동합니다.

LimitedConcurrencyLevelTaskScheduler (Samples for Parallel ProgrammingMSDN documentation for tasks에서 찾을 수 있음)을 지연된 작업 생성과 함께 사용하려고하면이 문제가 발생합니다. 해당 클래스에 익숙하지 않은 경우 예약 된 작업의 동시성 정도가 임의의 숫자로 제한됩니다.

기본적으로 웹 요청 작업 체인 생성을 LimitedConcurrencyLevelTaskScheduler에 의해 예약 된 작업으로 연기하여 동시 다운로드 수를 제한 할 수 있습니다. Task의 생성을 연기 할 때

As suggested by the sage Stephen Toub

는, 할 수있는 가장 좋은 것은 Func<Task> 또는 Func<Task<TResult>>를 반환하기 위해 API를 설계하는 것입니다. 나는 이것을했다.

아쉽게도 첫 번째 동시 작업 세트를 예약 한 후 내 프로그램이 중단됩니다. 내 작업이 4 도의 동시성으로 제한되어 있다고 가정 해보십시오. 이 경우 4 개의 작업이 시작되고 프로그램이 중단됩니다. 작업이 완료되지 않습니다.

나는이 문제를 간단히 설명하기 위해 최소한의 예를 만들었습니다. WebRequest을 사용하는 대신 파일 읽기를 사용하고 있습니다. 내가 "는 응답"에 의해 무슨 뜻인지 명확히하기 위해 1

class Program 
{ 
    static Func<Task> GetReadTask() 
    { 
     return() => 
     { 
      Console.WriteLine("Opening file."); 

      FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open); 

      byte[] buffer = new byte[32]; 

      Console.WriteLine("Beginning read."); 
      return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close()); 
     }; 
    } 

    static void Main() 
    { 
     LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1); 
     TaskFactory factory = new TaskFactory(ts); 

     int[] range = {1, 2, 3}; 

     var tasks = range.Select(number => 
     { 
      Func<Task> getTask = GetReadTask(); 
      return factory.StartNew(() => 
      { 
       var task = getTask(); 
       task.Wait(); 
      }); 
     }); 

     Task.WaitAll(tasks.ToArray()); 
    } 
} 

에 동시성의 정도를 제한이 출력이 모습입니다.

Opening file. 
Beginning read. 

그리고 나서 영원히 아무것도 인쇄되지 않습니다.

무슨 일이 일어나고 있는지에 대한 단서가 있습니까?

답변

5

좋은 질문입니다!

첫째, 나는 LimitedConcurrencyLevelTaskScheduler이 학문적으로 올바른 해결책이라고 확신하지 못합니다. 이를 통해 동시 요청 수를 N 개로 제한하려면 먼저 N 개의 작업을 차단해야 APM 비동기 호출을 사용하는 목적을 무력화해야합니다.

그렇기 때문에 대안보다는 구현하는 것이 훨씬 쉽습니다. 작업 대기열이 있어야하고 비행 요청 수를 유지 한 다음 필요에 따라 작업자 작업을 생성해야합니다. 그건 당연한 일이 아니며 동시 요청 수 N이 적 으면 N 개의 차단 된 스레드가 세상 끝이 아닐 것입니다.

코드에있는 문제점은 다른 태스크에서 작성된 태스크가 상위 태스크의 스케줄러를 사용한다는 것입니다. 사실 FromAsync으로 만든 작업은 기본 APM 구현을 사용하기 때문에 사실이 아니므로 약간 다릅니다. ,

return factory.StartNew(() => 
    { 
     var task = getTask(); 
     task.Wait(); 
    } 
); 

factoryLimitedConcurrencyLevelTaskScheduler(1)을 사용하므로 단지 1 이러한 작업을 동시에 실행할 수 있으며, 그 중 하나는 작업이 getTask()에서 반환 기다리고 :

당신은에 Main에서 작업을 만듭니다.

따라서 GetReadTask에서 Task<int>.Factory.FromAsync으로 전화하십시오. FromAsync이 상위 작업의 스케줄러를 따르지 않기 때문에이 작업이 실행됩니다.

그런 다음 .ContinueWith(task => fileStream.Close())과 함께 연속을 만듭니다. 이것은 부모의 스케줄러를 존중하는 태스크를 생성합니다. LimitedConcurrencyLevelTaskScheduler은 이미 차단 된 작업 (Main에있는 작업)을 실행하고 있기 때문에 계속 작업을 실행할 수없고 교착 상태에 빠졌습니다.

해결 방법은 TaskScheduler.Default 인 일반 스레드 풀 스레드에서 연속을 실행하는 것입니다. 그런 다음 실행되고 교착 상태가 깨졌습니다.

static void Main() 
{ 
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1); 

    int[] range = { 1, 2, 3 }; 

    var tasks = range.Select(number => 
     { 
      var task = QueueReadTask(ts, number); 

      return task.ContinueWith(t => Output.Write("Number " + number + " completed")); 
     } 
    ) 
    .ToArray(); 

    Output.Write("Waiting for " + tasks.Length + " tasks: " + String.Join(" ", tasks.Select(t => t.Status).ToArray())); 

    Task.WaitAll(tasks); 

    Output.Write("WaitAll complete for " + tasks.Length + " tasks: " + String.Join(" ", tasks.Select(t => t.Status).ToArray())); 
} 

몇 가지주의 할 수 있습니다이 :

이동 QueueReadTasktask.Wait() 더를 만드는

static Task QueueReadTask(TaskScheduler ts, int number) 
{ 
    Output.Write("QueueReadTask(" + number + ")"); 

    return Task.Factory.StartNew(() => 
     { 
      Output.Write("Opening file " + number + "."); 

      FileStream fileStream = File.Open("D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read); 

      byte[] buffer = new byte[ 32 ]; 

      var tRead = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null); 

      var tClose = tRead.ContinueWith(task => 
        { 
         Output.Write("Closing file " + number + ". Read " + task.Result + " bytes."); 
         fileStream.Close(); 
        } 
        , TaskScheduler.Default 
       ); 

      tClose.Wait(); 
     } 
     , CancellationToken.None 
     , TaskCreationOptions.None 
     , ts 
    ); 
} 

그리고 Main 지금은 다음과 같습니다 : 여기

내 솔루션입니다 당신이 작업을 차단하고 있다는 것이 명백합니다. 대체로 FromAsync 통화 및 계속 통화를 제거하고 어쨌든 차단하고 있으므로 정상적인 동기 호출로 대체 할 수 있습니다.

QueueReadTask에서 반환 된 작업은 계속 될 수 있습니다. 기본적으로 상위 태스크 스케줄러는 상위 태스크 스케줄러를 상속하므로 기본 스케줄러에서 실행됩니다. 이 경우 상위 타스크가 없으므로 기본 스케줄러가 사용됩니다.

+0

이것은 내 질문을 완벽하게 설명하지 못합니다! 예제 코드도 가져 주셔서 감사합니다. 그러나 작업 대기열을 구현하여 현재 임의의 수의 스레드 만 현재 "진행 중"이되도록합니다. 다시 한 번 감사드립니다! –

관련 문제