많은 비동기 클라이언트가 요청을하고 응답을 기다릴 대기열을 제공하는 서비스를 작성하려고합니다. Y 기간 당 X 요청에 의해 대기열 처리를 조절할 수 있어야합니다. 예 : 초당 50 건의 웹 요청. 그것은 초당 X 요청만을 발행 할 수있는 타사 REST 서비스를위한 것입니다.비동기 시간 및 일괄 처리 소비량이있는 생산자/소비자
많은 질문을 찾았습니다. TPL Dataflow를 사용하는 길을 안내해주었습니다. TranformBlock을 사용하여 사용자 지정 조절 기능을 제공 한 다음 X 개의 ActionBlocks를 사용하여 병렬 작업을 완료했습니다. Action의 구현은 다소 어수선해 보입니다. Task가 파이프 라인에 전달되어 한 번 완료되면 호출자에게 알리는 더 좋은 방법이 있는지 궁금합니다.
내가 원하는 것을 수행하는 데있어 더 좋거나 더/더 간단한 방법이 있는지 궁금합니다. 내 구현에 눈부신 문제가 있습니까? 취소 및 예외 처리가 누락되었음을 알고 있으며 다음에이 작업을 수행 할 예정이지만 의견을 가장 환영합니다.
저는 Extended Stephen Cleary's example for my Dataflow pipeline입니다.
svick's concept of a time throttled TransformBlock입니다. 나는 내가 구축 한 것이 순수한 SemaphoreSlim design으로 쉽게 달성 될 수 있을지 궁금해하고있다. 시간을 기반으로하는 최대 작동의 조절이다.
다음은 최신 구현입니다. FIFO 큐 비동기 큐에서 사용자 지정 동작을 전달할 수 있습니다.
public class ThrottledProducerConsumer<T>
{
private class TimerState<T1>
{
public SemaphoreSlim Sem;
public T1 Value;
}
private BufferBlock<T> _queue;
private IPropagatorBlock<T, T> _throttleBlock;
private List<Task> _consumers;
private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
{
SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
return new TransformBlock<T1, T1>(async (x) =>
{
var sw = new Stopwatch();
sw.Start();
//Console.WriteLine($"Current count: {_sem.CurrentCount}");
await _sem.WaitAsync();
sw.Stop();
var now = DateTime.UtcNow;
var releaseTime = now.Add(Interval) - now;
//-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
var tm = new Timer((s) => {
var state = (TimerState<T1>)s;
//Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
state.Sem.Release();
}, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
-1);
/*
Task.Delay(delay).ContinueWith((t)=>
{
Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
//_sem.Release();
});
*/
//Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
return x;
},
//new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
//
new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
}
public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
{
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
//-- Create the Queue
_queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
//-- Create and link the throttle block
_throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
_queue.LinkTo(_throttleBlock, linkOptions);
//-- Create and link the consumer(s) to the throttle block
var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
_consumers = new List<Task>();
for (int i = 0; i < MaxConsumers; i++)
{
var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
_throttleBlock.LinkTo(consumer, linkOptions);
_consumers.Add(consumer.Completion);
}
//-- TODO: Add some cancellation tokens to shut this thing down
}
/// <summary>
/// Default Consumer Action, just prints to console
/// </summary>
/// <param name="ItemToConsume"></param>
private void ConsumeItem(T ItemToConsume)
{
Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
}
public async Task EnqueueAsync(T ItemToEnqueue)
{
await this._queue.SendAsync(ItemToEnqueue);
}
public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
{
foreach (var item in ItemsToEnqueue)
{
await this._queue.SendAsync(item);
}
}
public async Task CompleteAsync()
{
this._queue.Complete();
await Task.WhenAll(_consumers);
Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
}
}
시험 방법
public class WorkItem<T>
{
public TaskCompletionSource<T> tcs;
//public T respone;
public string url;
public WorkItem(string Url)
{
tcs = new TaskCompletionSource<T>();
url = Url;
}
public override string ToString()
{
return $"{url}";
}
}
public static void TestQueue()
{
Console.WriteLine("Created the queue");
var defaultAction = new Action<WorkItem<String>>(async i => {
var taskItem = ((WorkItem<String>)i);
Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
//-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
await Task.Delay(5000);
taskItem.tcs.SetResult($"{taskItem.url}");
//Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
});
var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);
var results = new List<Task>();
foreach (var no in Enumerable.Range(0, 20))
{
var workItem = new WorkItem<String>($"http://someurl{no}.com");
results.Add(queue.EnqueueAsync(workItem));
results.Add(workItem.tcs.Task);
results.Add(workItem.tcs.Task.ContinueWith(response =>
{
Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
}));
}
Task.WhenAll(results).Wait();
Console.WriteLine("All Work Items Have Been Processed");
}
몇 가지 생각 : 1) C#을 사용 중이며 웹 요청을 언급하고 있습니다. IIS는 꽤 많이 이것을 수행합니다 (출력 측면이 아닌 입력 측면에서도 조절 포함). 2) 아마도 ConcurrentQueue만큼이나 간단한 것은 조절을 위해 SemaphoreSlim을 사용하여 동시성을 처리하는 데 필요한 모든 것입니다. 3) 궁극적으로이를 확장하려는 경우, 특히 요청을 처리하는 여러 시스템에 대해 서비스 버스가 더 좋은 옵션 일 수 있습니까? 그러나 스로틀 링은 더 어려울 것입니다. – sellotape
대기열의 끝에 다른 블록을 추가하지 않으면 호출자에게 처리 된 요청에 대해 알릴 수 있습니다. – VMAtm
@VMAtm 그게 방법이라고 생각하지만, 작업 위임자 또는 작업 완료시 호출자에게 알릴 대기열에 전달하는 방법에 대해 생각하고있었습니다. – Nicholas