나는 이것을 몇 시간 동안 알아 내려고 노력해 왔지만 왜 이것이 작동하지 않는지 이해하지 못했습니다. 업로드 할 파일 배치를 허용하는 TPL Dataflow 배치 블록이 있습니다. UploadAsync 메서드를 사용하기 위해 취소 토큰을 사용할 수 있어야합니다. 배치의 각 작업을 서로 병렬로 처리하려고하지만 한 번에 하나의 배치 만 처리하려고합니다.Task.WhenAll을 사용하여 타사 비동기 메서드 호출을 대기하는 방법
var actionBlock = new ActionBlock<TInput>(input =>
{
DoWork(input);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = new CustomScheduler(1, ApartmentState.MTA, ThreadPriority.Normal)
});
void DoWork(TInput input)
{
var castedMessage = input as Tuple<string, >[];
if (castedMessage == null) return;
ProcessBatch(castedMessage);
}
public void ProcessBatch(Tuple<string, string>[] batch)
{
if (batch == null) return;
Task<Tuple<bool, long>>[] batchUploadTaskArray = new Task<Tuple<bool, long>>[batch.Length * 2];
int taskArrayIndex = 0;
foreach(var job in batch)
{
if (job == null) return;
var chunkSegment = job.Item1;
var indexSegment = job.Item2;
var uploadTask1 = uploader.putFileAsync(job.Item1.LocalFilePath, job.Item1.RemoteChunkFilePath);
uploadTask1.ContinueWith(task =>
{
if(task.Result.Item1)
UploaderStatsManager.Instance.UpdateUploadDuration(task.Result.Item2);
}, TaskScheduler.Default
);
batchUploadTaskArray[taskArrayIndex++] = uploadTask1;
batchUploadTaskArray[taskArrayIndex++] = uploader.putFileAsync(idxLocalFilepath, indexSegment.RemoteIndexFilePath);
}
Task.WaitAll(batchUploadTaskArray);
Console.WriteLine("Done uploading");
}
//Method A - AWS uploader class
CancellationTokenSource cts = new CancellationTokenSource();
public async override Task<Tuple<bool, long>> putFileAsync(string filePath, string destPath)
{
var sw = new Stopwatch();
using (var s3Client = new AmazonS3Client(mAccessKeyId, mSecretAccessKeyId, new AmazonS3Config
{
RegionEndpoint = !String.Equals(mRegion.DisplayName, "Unknown") ? mRegion : RegionEndpoint.USEast1,
}))
{
using (var transferUtility = new TransferUtility(s3Client))
{
using (var file = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
{
var transferRequest = new TransferUtilityUploadRequest
{
Key = destPath.TrimStart('/'),
InputStream = file,
BucketName = mBucketName,
CannedACL = S3CannedACL.PublicRead
};
transferRequest.UploadProgressEvent += DisplayFileProgress;
var fInfo = new FileInfo(filePath);
sw.Start();
await transferUtility.UploadAsync(transferRequest, cts.Token).ContinueWith((task) => sw.Stop());
}
}
}
return new Tuple<bool, long>(true, sw.ElapsedMilliseconds);
}
//Method B - Alternate FTP class
public Task<Tuple<bool, long>> putFileAsync(string filePath, string destPath)
{
return Task<Tuple<bool, long>>.Run(() =>
{
var sw = Stopwatch.StartNew();
var putRC = ftp.putFile(filePath, destPath);
sw.Stop();
return new Tuple<bool, long>(putRC, sw.ElapsedMilliseconds);
});
}
//uploader stats class
public class UploaderStatsManager
{
public ConcurrentQueue<double> uploadDurationQueue;
private static UploaderStatsManager instance;
private UploaderStatsManager()
{
uploadDurationQueue = new ConcurrentQueue<double>();
}
public static UploaderStatsManager Instance
{
get
{
lock (statsLock)
{
if (instance == null)
instance = new UploaderStatsManager();
return instance;
}
}
}
internal void UpdateUploadDuration(double uploadDuration)
{
double durationOut = 0.0;
bool rc = true;
if(uploadDurationQueue.Count >= 12)
rc = uploadDurationQueue.TryDequeue(out durationOut);
if(rc)
uploadDurationQueue.Enqueue(uploadDuration);
}
}
나는 또한 비동기 파일을 넣지 않는 FTP 파일을 넣기 위해 다른 프레임 워크를 사용하고 있습니다. 방법 A를 사용하면 첫 번째 일괄 처리 프로세스가 Task.WaitAll에서 중단됩니다. 방법 B를 사용하는 경우 한 번에 하나의 일괄 처리 만 처리되고 일괄 처리는 Task.WaitAll에서 멈추지 않습니다. ProcessBatch 메서드를 비동기 메서드로 변경하고 Task.WaitAll 대신 "await Task.WhenAll"을 사용하려고했습니다.이 메서드는 MethodA와 MethodB에서 작동하지만 일괄 처리는 병렬 처리되지 않습니다.
도움 주셔서 감사합니다.
'uploadTask1.ContinueWith'의 결과를 버리고 있습니다. 아마도, 당신은 그걸 기다려야 할 것입니다. 이 문제를 해결하면 다시 살펴 보겠습니다. 일반적으로 ContinueWith를 사용하지 말고'await'을 사용하는 것이 좋습니다. 때로는 다른 코드 나 비동기 델리게이트로 코드를 추출해야하는 경우도 있습니다. – usr
그것은 버려지고 있지 않습니다. 간결함을 위해 코드를 생략했습니다. – zjacobs
그런 다음 해당 코드는 분명히 무시할 수없는 버그가 있으므로 해당 코드를 추가하십시오. – usr