2014-06-13 4 views
0

나는 이것을 몇 시간 동안 알아 내려고 노력해 왔지만 왜 이것이 작동하지 않는지 이해하지 못했습니다. 업로드 할 파일 배치를 허용하는 TPL Dataflow 배치 블록이 있습니다. UploadAsync 메서드를 사용하기 위해 취소 토큰을 사용할 수 있어야합니다. 배치의 각 작업을 서로 병렬로 처리하려고하지만 한 번에 하나의 배치 만 처리하려고합니다.Task.WhenAll을 사용하여 타사 비동기 메서드 호출을 대기하는 방법

var actionBlock = new ActionBlock<TInput>(input => 
{ 
    DoWork(input); 
}, 
new ExecutionDataflowBlockOptions 
{ 
    TaskScheduler = new CustomScheduler(1, ApartmentState.MTA, ThreadPriority.Normal) 
}); 

void DoWork(TInput input) 
{ 
    var castedMessage = input as Tuple<string, >[]; 
    if (castedMessage == null) return; 
     ProcessBatch(castedMessage); 
} 

public void ProcessBatch(Tuple<string, string>[] batch) 
{ 
    if (batch == null) return; 


    Task<Tuple<bool, long>>[] batchUploadTaskArray = new Task<Tuple<bool, long>>[batch.Length * 2]; 

    int taskArrayIndex = 0; 

    foreach(var job in batch) 
    { 
     if (job == null) return; 

     var chunkSegment = job.Item1; 
     var indexSegment = job.Item2; 

     var uploadTask1 = uploader.putFileAsync(job.Item1.LocalFilePath, job.Item1.RemoteChunkFilePath); 
     uploadTask1.ContinueWith(task => 
      { 
       if(task.Result.Item1) 
        UploaderStatsManager.Instance.UpdateUploadDuration(task.Result.Item2); 
      }, TaskScheduler.Default 
     ); 

     batchUploadTaskArray[taskArrayIndex++] = uploadTask1; 
     batchUploadTaskArray[taskArrayIndex++] = uploader.putFileAsync(idxLocalFilepath, indexSegment.RemoteIndexFilePath); 
    } 

    Task.WaitAll(batchUploadTaskArray);    
    Console.WriteLine("Done uploading"); 
} 


//Method A - AWS uploader class 
CancellationTokenSource cts = new CancellationTokenSource(); 
public async override Task<Tuple<bool, long>> putFileAsync(string filePath, string destPath) 
{ 
    var sw = new Stopwatch(); 

    using (var s3Client = new AmazonS3Client(mAccessKeyId, mSecretAccessKeyId, new AmazonS3Config 
    { 
     RegionEndpoint = !String.Equals(mRegion.DisplayName, "Unknown") ? mRegion : RegionEndpoint.USEast1, 
    })) 
    { 
     using (var transferUtility = new TransferUtility(s3Client)) 
     { 
      using (var file = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read)) 
      { 
       var transferRequest = new TransferUtilityUploadRequest 
       { 
        Key = destPath.TrimStart('/'), 
        InputStream = file, 
        BucketName = mBucketName, 
        CannedACL = S3CannedACL.PublicRead 
       }; 

       transferRequest.UploadProgressEvent += DisplayFileProgress; 

       var fInfo = new FileInfo(filePath); 
       sw.Start(); 
       await transferUtility.UploadAsync(transferRequest, cts.Token).ContinueWith((task) => sw.Stop()); 
      } 
     } 
    } 

    return new Tuple<bool, long>(true, sw.ElapsedMilliseconds); 
} 


//Method B - Alternate FTP class 
public Task<Tuple<bool, long>> putFileAsync(string filePath, string destPath) 
{ 
    return Task<Tuple<bool, long>>.Run(() => 
    { 
     var sw = Stopwatch.StartNew(); 
     var putRC = ftp.putFile(filePath, destPath); 
     sw.Stop(); 
     return new Tuple<bool, long>(putRC, sw.ElapsedMilliseconds); 
    }); 
} 


//uploader stats class 
public class UploaderStatsManager 
{ 
    public ConcurrentQueue<double> uploadDurationQueue; 
    private static UploaderStatsManager instance; 

    private UploaderStatsManager() 
    { 
     uploadDurationQueue = new ConcurrentQueue<double>(); 
    } 

    public static UploaderStatsManager Instance 
    { 
     get 
     { 
      lock (statsLock) 
      { 
       if (instance == null) 
        instance = new UploaderStatsManager(); 
       return instance; 
      } 
     } 
    } 

    internal void UpdateUploadDuration(double uploadDuration) 
    { 
     double durationOut = 0.0; 
     bool rc = true; 

     if(uploadDurationQueue.Count >= 12) 
      rc = uploadDurationQueue.TryDequeue(out durationOut); 

     if(rc) 
      uploadDurationQueue.Enqueue(uploadDuration); 
    } 
} 

나는 또한 비동기 파일을 넣지 않는 FTP 파일을 넣기 위해 다른 프레임 워크를 사용하고 있습니다. 방법 A를 사용하면 첫 번째 일괄 처리 프로세스가 Task.WaitAll에서 중단됩니다. 방법 B를 사용하는 경우 한 번에 하나의 일괄 처리 만 처리되고 일괄 처리는 Task.WaitAll에서 멈추지 않습니다. ProcessBatch 메서드를 비동기 메서드로 변경하고 Task.WaitAll 대신 "await Task.WhenAll"을 사용하려고했습니다.이 메서드는 MethodA와 MethodB에서 작동하지만 일괄 처리는 병렬 처리되지 않습니다.

도움 주셔서 감사합니다.

+2

'uploadTask1.ContinueWith'의 결과를 버리고 있습니다. 아마도, 당신은 그걸 기다려야 할 것입니다. 이 문제를 해결하면 다시 살펴 보겠습니다. 일반적으로 ContinueWith를 사용하지 말고'await'을 사용하는 것이 좋습니다. 때로는 다른 코드 나 비동기 델리게이트로 코드를 추출해야하는 경우도 있습니다. – usr

+0

그것은 버려지고 있지 않습니다. 간결함을 위해 코드를 생략했습니다. – zjacobs

+0

그런 다음 해당 코드는 분명히 무시할 수없는 버그가 있으므로 해당 코드를 추가하십시오. – usr

답변

0

나는 이것을 알아 냈다. putFileAsync 메서드 내에서 uploadAsync는 작업을 반환합니다. 나는 그 작업에 대해 Task.WaitAll을 호출하고 메서드 헤더에서 async 키워드를 제거한다.

public override Task<Tuple<bool, long>> putFileAsync(string filePath, string destPath) 
{ 
    return Task<Tuple<bool, long>>.Run(() => 
    { 
     var sw = new Stopwatch(); 

     using (var s3Client = new AmazonS3Client(mAccessKeyId, mSecretAccessKeyId, new AmazonS3Config 
     { 
      RegionEndpoint = !String.Equals(mRegion.DisplayName, "Unknown") ? mRegion : RegionEndpoint.USEast1, 
     })) 
     { 
      using (var transferUtility = new TransferUtility(s3Client)) 
      { 
       using (var file = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read)) 
       { 
        var transferRequest = new TransferUtilityUploadRequest 
        { 
         Key = destPath.TrimStart('/'), 
         InputStream = file, 
         BucketName = mBucketName, 
         CannedACL = S3CannedACL.PublicRead 
        }; 

        transferRequest.UploadProgressEvent += DisplayFileProgress; 

        var fInfo = new FileInfo(filePath); 
        sw.Start(); 
        Task uploadTask = transferUtility.UploadAsync(transferRequest, cts.Token).ContinueWith((task)=>sw.Stop()); 
        Task.WaitAll(uploadTask); 
       } 
      } 
     } 

     return new Tuple<bool, long>(true, sw.ElapsedMilliseconds); 
    }); 
} 
관련 문제