1

많은 비동기 클라이언트가 요청을하고 응답을 기다릴 대기열을 제공하는 서비스를 작성하려고합니다. Y 기간 당 X 요청에 의해 대기열 처리를 조절할 수 있어야합니다. 예 : 초당 50 건의 웹 요청. 그것은 초당 X 요청만을 발행 할 수있는 타사 REST 서비스를위한 것입니다.비동기 시간 및 일괄 처리 소비량이있는 생산자/소비자

많은 질문을 찾았습니다. TPL Dataflow를 사용하는 길을 안내해주었습니다. TranformBlock을 사용하여 사용자 지정 조절 기능을 제공 한 다음 X 개의 ActionBlocks를 사용하여 병렬 작업을 완료했습니다. Action의 구현은 다소 어수선해 보입니다. Task가 파이프 라인에 전달되어 한 번 완료되면 호출자에게 알리는 더 좋은 방법이 있는지 궁금합니다.

내가 원하는 것을 수행하는 데있어 더 좋거나 더/더 간단한 방법이 있는지 궁금합니다. 내 구현에 눈부신 문제가 있습니까? 취소 및 예외 처리가 누락되었음을 알고 있으며 다음에이 작업을 수행 할 예정이지만 의견을 가장 환영합니다.

저는 Extended Stephen Cleary's example for my Dataflow pipeline입니다.
svick's concept of a time throttled TransformBlock입니다. 나는 내가 구축 한 것이 순수한 SemaphoreSlim design으로 쉽게 달성 될 수 있을지 궁금해하고있다. 시간을 기반으로하는 최대 작동의 조절이다.

다음은 최신 구현입니다. FIFO 큐 비동기 큐에서 사용자 지정 동작을 전달할 수 있습니다.

public class ThrottledProducerConsumer<T> 
{ 
    private class TimerState<T1> 
    { 
     public SemaphoreSlim Sem; 
     public T1 Value; 
    } 

    private BufferBlock<T> _queue; 
    private IPropagatorBlock<T, T> _throttleBlock; 
    private List<Task> _consumers; 

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval) 
    { 
     SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval); 
     return new TransformBlock<T1, T1>(async (x) => 
     { 
      var sw = new Stopwatch(); 
      sw.Start(); 
      //Console.WriteLine($"Current count: {_sem.CurrentCount}"); 
      await _sem.WaitAsync(); 

      sw.Stop(); 
      var now = DateTime.UtcNow; 
      var releaseTime = now.Add(Interval) - now; 

      //-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete 
      var tm = new Timer((s) => { 
       var state = (TimerState<T1>)s; 
       //Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem"); 
       state.Sem.Release(); 

      }, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds, 
      -1); 

      /* 
      Task.Delay(delay).ContinueWith((t)=> 
      { 
       Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem"); 
       //_sem.Release(); 
      }); 
      */ 

      //Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}"); 
      return x; 
     }, 
      //new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 
      // 
      new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 }); 
    } 

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1) 
    { 
     var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, }; 
     var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, }; 

     //-- Create the Queue 
     _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, }); 

     //-- Create and link the throttle block 
     _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval); 
     _queue.LinkTo(_throttleBlock, linkOptions); 

     //-- Create and link the consumer(s) to the throttle block 
     var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem); 
     _consumers = new List<Task>(); 
     for (int i = 0; i < MaxConsumers; i++) 
     { 
      var consumer = new ActionBlock<T>(consumerAction, consumerOptions); 
      _throttleBlock.LinkTo(consumer, linkOptions); 
      _consumers.Add(consumer.Completion); 
     } 

     //-- TODO: Add some cancellation tokens to shut this thing down 
    } 

    /// <summary> 
    /// Default Consumer Action, just prints to console 
    /// </summary> 
    /// <param name="ItemToConsume"></param> 
    private void ConsumeItem(T ItemToConsume) 
    { 
     Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}"); 
    } 

    public async Task EnqueueAsync(T ItemToEnqueue) 
    { 
     await this._queue.SendAsync(ItemToEnqueue); 
    } 

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue) 
    { 
     foreach (var item in ItemsToEnqueue) 
     { 
      await this._queue.SendAsync(item); 
     } 
    } 

    public async Task CompleteAsync() 
    { 
     this._queue.Complete(); 
     await Task.WhenAll(_consumers); 
     Console.WriteLine($"All consumers completed {DateTime.UtcNow}"); 
    } 
} 

시험 방법

public class WorkItem<T> 
    { 
     public TaskCompletionSource<T> tcs; 
     //public T respone; 
     public string url; 
     public WorkItem(string Url) 
     { 
      tcs = new TaskCompletionSource<T>(); 
      url = Url; 
     } 
     public override string ToString() 
     { 
      return $"{url}"; 
     } 
    } 

    public static void TestQueue() 
    { 
     Console.WriteLine("Created the queue"); 

     var defaultAction = new Action<WorkItem<String>>(async i => { 
      var taskItem = ((WorkItem<String>)i); 
      Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}"); 
      //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url); 
      await Task.Delay(5000); 
      taskItem.tcs.SetResult($"{taskItem.url}"); 
      //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}"); 
     }); 

     var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction); 

     var results = new List<Task>(); 
     foreach (var no in Enumerable.Range(0, 20)) 
     { 
      var workItem = new WorkItem<String>($"http://someurl{no}.com"); 
      results.Add(queue.EnqueueAsync(workItem)); 
      results.Add(workItem.tcs.Task); 
      results.Add(workItem.tcs.Task.ContinueWith(response => 
      { 
       Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}"); 
      })); 
     } 

     Task.WhenAll(results).Wait(); 
     Console.WriteLine("All Work Items Have Been Processed"); 
    } 
+1

몇 가지 생각 : 1) C#을 사용 중이며 웹 요청을 언급하고 있습니다. IIS는 꽤 많이 이것을 수행합니다 (출력 측면이 아닌 입력 측면에서도 조절 포함). 2) 아마도 ConcurrentQueue만큼이나 간단한 것은 조절을 위해 SemaphoreSlim을 사용하여 동시성을 처리하는 데 필요한 모든 것입니다. 3) 궁극적으로이를 확장하려는 경우, 특히 요청을 처리하는 여러 시스템에 대해 서비스 버스가 더 좋은 옵션 일 수 있습니까? 그러나 스로틀 링은 더 어려울 것입니다. – sellotape

+0

대기열의 끝에 다른 블록을 추가하지 않으면 호출자에게 처리 된 요청에 대해 알릴 수 있습니다. – VMAtm

+0

@VMAtm 그게 방법이라고 생각하지만, 작업 위임자 또는 작업 완료시 호출자에게 알릴 대기열에 전달하는 방법에 대해 생각하고있었습니다. – Nicholas

답변

1

요구 때문에, 나는 TPL 데이터 흐름에 따라 ThrottledConsumerProducer 클래스를 만들었습니다. 그것은 대기열에 넣어졌고 순서대로 완료된 동시 생산자 (문제없이 약 281k)를 포함하여 여러 날 동안 테스트되었지만 아직 발견하지 못한 버그가 있습니다. 스로틀과 내가 필요로 차단을 제공

  • TransformaBlock :

    1. 내가 비동기 대기열로 BufferBlock를 사용하고,이이 연결되어 있습니다. 최대 요청을 제어하기 위해 SempahoreSlim과 함께 사용됩니다. 각 항목은 블록을 통과 할 때마다 세마포어를 증가시키고 나중에 세마포어를 해제하기 위해 X duration을 실행하도록 작업을 예약합니다. 이 방법으로 나는 지속 기간마다 X 요청의 슬라이딩 윈도우를가집니다. 정확히 내가 원했던 것. TPL 덕분에 연결 대상에 병렬 처리를 활용하고 있습니다. 필요한 작업을 수행 할 책임이있는
    2. ActionBlock (s).

    클래스는 일반적인 요소이므로 유사한 항목이 필요한 경우 다른 사용자에게 유용 할 수 있습니다. 나는 취소 또는 오류 처리를 작성하지는 않았지만이를 옮기려면 응답으로 표시해야한다고 생각했습니다. 내 대답을 받아 들일 수있는 답변으로보기보다는 몇 가지 대안과 피드백을 보게되어 매우 기쁩니다. 읽어 주셔서 감사합니다.

    참고 : 세마포어가 최대 값보다 많이 나오게하는 기묘한 작업을 수행하면서 원래 구현에서 Timer를 제거했습니다. 동적 요청이 있다고 가정합니다. 동시 요청을 실행할 때 발생했습니다. Task.Delay를 사용하여 세마포어 잠금 해제 일정을 잡았습니다.

    축소됨 생산자 소비자

    public class ThrottledProducerConsumer<T> 
    { 
        private BufferBlock<T> _queue; 
        private IPropagatorBlock<T, T> _throttleBlock; 
        private List<Task> _consumers; 
    
        private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, 
         Int32 MaxPerInterval, Int32 BlockBoundedMax = 2, Int32 BlockMaxDegreeOfParallelism = 2) 
        { 
         SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval, MaxPerInterval); 
         return new TransformBlock<T1, T1>(async (x) => 
         { 
          //Log($"Transform blk: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count: {_sem.CurrentCount}"); 
          var sw = new Stopwatch(); 
          sw.Start(); 
          //Console.WriteLine($"Current count: {_sem.CurrentCount}"); 
          await _sem.WaitAsync(); 
    
          sw.Stop(); 
          var delayTask = Task.Delay(Interval).ContinueWith((t) => 
          { 
           //Log($"Pre-RELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count {_sem.CurrentCount}"); 
           _sem.Release(); 
           //Log($"PostRELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphoere Count {_sem.CurrentCount}"); 
          }); 
          //},TaskScheduler.FromCurrentSynchronizationContext());     
          //Log($"Transformed: {x} in queue {sw.ElapsedMilliseconds}ms. {DateTime.Now:mm:ss:ff} will release {DateTime.Now.Add(Interval):mm:ss:ff} Semaphoere Count {_sem.CurrentCount}"); 
          return x; 
         }, 
          //-- Might be better to keep Bounded Capacity in sync with the semaphore 
          new ExecutionDataflowBlockOptions { BoundedCapacity = BlockBoundedMax, 
           MaxDegreeOfParallelism = BlockMaxDegreeOfParallelism }); 
        } 
    
        public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, 
         Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1, 
         Int32 MaxThrottleBuffer = 20, Int32 MaxDegreeOfParallelism = 10) 
        { 
         //-- Probably best to link MaxPerInterval and MaxThrottleBuffer 
         // and MaxConsumers with MaxDegreeOfParallelism 
         var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, }; 
         var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, }; 
    
         //-- Create the Queue 
         _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, }); 
    
         //-- Create and link the throttle block 
         _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval); 
         _queue.LinkTo(_throttleBlock, linkOptions); 
    
         //-- Create and link the consumer(s) to the throttle block 
         var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem); 
         _consumers = new List<Task>(); 
         for (int i = 0; i < MaxConsumers; i++) 
         { 
          var consumer = new ActionBlock<T>(consumerAction, consumerOptions); 
          _throttleBlock.LinkTo(consumer, linkOptions); 
          _consumers.Add(consumer.Completion); 
         } 
    
         //-- TODO: Add some cancellation tokens to shut this thing down 
        } 
    
        /// <summary> 
        /// Default Consumer Action, just prints to console 
        /// </summary> 
        /// <param name="ItemToConsume"></param> 
        private void ConsumeItem(T ItemToConsume) 
        { 
         Log($"Consumed {ItemToConsume} at {DateTime.UtcNow}"); 
        } 
    
        public async Task EnqueueAsync(T ItemToEnqueue) 
        { 
         await this._queue.SendAsync(ItemToEnqueue); 
        } 
    
        public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue) 
        { 
         foreach (var item in ItemsToEnqueue) 
         { 
          await this._queue.SendAsync(item); 
         } 
        } 
    
        public async Task CompleteAsync() 
        { 
         this._queue.Complete(); 
         await Task.WhenAll(_consumers); 
         Console.WriteLine($"All consumers completed {DateTime.UtcNow}"); 
        } 
        private static void Log(String messageToLog) 
        { 
         System.Diagnostics.Trace.WriteLine(messageToLog); 
         Console.WriteLine(messageToLog); 
        } 
    
    } 
    

    - 사용 예 -

    제네릭 WorkItem에서

    public class WorkItem<Toutput,Tinput> 
    { 
        private TaskCompletionSource<Toutput> _tcs; 
        public Task<Toutput> Task { get { return _tcs.Task; } } 
    
        public Tinput InputData { get; private set; } 
        public Toutput OutputData { get; private set; } 
    
        public WorkItem(Tinput inputData) 
        { 
         _tcs = new TaskCompletionSource<Toutput>(); 
         InputData = inputData; 
        } 
    
        public void Complete(Toutput result) 
        { 
         _tcs.SetResult(result); 
        } 
    
        public void Failed(Exception ex) 
        { 
         _tcs.SetException(ex); 
        } 
    
        public override string ToString() 
        { 
         return InputData.ToString(); 
        } 
    } 
    

    파이프 라인에서 실행되는 액션 블록을 만들기

    private Action<WorkItem<Location,PointToLocation>> CreateProcessingAction() 
        { 
         return new Action<WorkItem<Location,PointToLocation>>(async i => { 
          var sw = new Stopwatch(); 
          sw.Start(); 
    
          var taskItem = ((WorkItem<Location,PointToLocation>)i); 
          var inputData = taskItem.InputData; 
    
          //Log($"Consuming: {inputData.Latitude},{inputData.Longitude} {DateTime.UtcNow:mm:ss:ff}"); 
    
          //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url); 
          await Task.Delay(500); 
          sw.Stop(); 
          Location outData = new Location() 
          { 
           Latitude = inputData.Latitude, 
           Longitude = inputData.Longitude, 
           StreetAddress = $"Consumed: {inputData.Latitude},{inputData.Longitude} Duration(ms): {sw.ElapsedMilliseconds}" 
          }; 
          taskItem.Complete(outData); 
          //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}"); 
         }); 
    
        } 
    

    시험 방법 당신은 PointToLocation 및 위치에 대한 자신의 구현을 제공해야합니다. 자신의 수업과 함께 사용하는 방법에 대한 예입니다.

    int startRange = 0; 
        int nextRange = 1000; 
        ThrottledProducerConsumer<WorkItem<Location,PointToLocation>> tpc; 
        private void cmdTestPipeline_Click(object sender, EventArgs e) 
        { 
         Log($"Pipeline test started {DateTime.Now:HH:mm:ss:ff}"); 
    
         if(tpc == null) 
         { 
          tpc = new ThrottledProducerConsumer<WorkItem<Location, PointToLocation>>(
           //1010, 2, 20000, 
           TimeSpan.FromMilliseconds(1010), 45, 100000, 
           CreateProcessingAction(), 
           2,45,10); 
         } 
    
         var workItems = new List<WorkItem<Models.Location, PointToLocation>>(); 
         foreach (var i in Enumerable.Range(startRange, nextRange)) 
         { 
          var ptToLoc = new PointToLocation() { Latitude = i + 101, Longitude = i + 100 }; 
          var wrkItem = new WorkItem<Location, PointToLocation>(ptToLoc); 
          workItems.Add(wrkItem); 
    
    
          wrkItem.Task.ContinueWith(t => 
          { 
           var loc = t.Result; 
           string line = $"[Simulated:{DateTime.Now:HH:mm:ss:ff}] - {loc.StreetAddress}"; 
           //txtResponse.Text = String.Concat(txtResponse.Text, line, System.Environment.NewLine); 
           //var lines = txtResponse.Text.Split(new string[] { System.Environment.NewLine}, 
           // StringSplitOptions.RemoveEmptyEntries).LongCount(); 
    
           //lblLines.Text = lines.ToString(); 
           //Log(line); 
    
          }); 
          //}, TaskScheduler.FromCurrentSynchronizationContext()); 
    
         } 
    
         startRange += nextRange; 
    
         tpc.EnqueueItemsAsync(workItems); 
    
         Log($"Pipeline test completed {DateTime.Now:HH:mm:ss:ff}"); 
        }