2014-04-06 2 views
5

해당 항목을 병렬로 처리하는 TransformBlock을 설정하고 싶습니다. 따라서, 나는 메시지의 순서에 대해 걱정하지 않는다> 1. ExecutionDataflowBlockOptions.MaxDegreeOfParallelism를하고 있어요하지만 documentation는 말한다 : 당신이 1보다 큰 병렬 처리의 최대 수준을 지정하면TPL 데이터 흐름 라이브러리를 사용하여 정렬되지 않은 실행 블록을 어떻게 지정할 수 있습니까?

, 여러 메시지가 동시에 처리되므로 메시지는 수신 된 순서대로 처리되지 않을 수 있습니다. 그러나 메시지가 블록에서 출력되는 순서는 올바르게 정렬됩니다.

대기열에 처리 시간이 오래 걸리는 메시지가 하나있는 경우이 하나의 메시지가 처리 될 때까지 추가 메시지가 출력되지 않는다는 의미입니까?

그렇다면 순서를 신경 쓰지 않는 실행 블록 (예 : TransformBlock)을 어떻게 지정할 수 있습니까? 아니면 주문에 대해 신경 쓰지 말고 소비 마지막에 지정해야합니까?

답변

5

라이브러리에는 이러한 블록이 없지만 ActionBlockBufferBlock을 조합하여 쉽게 만들 수 있습니다. 같은 뭔가 : 항목이 ActionBlock에 의해 처리되면

public static IPropagatorBlock<TInput, TOutput> 
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options) 
{ 
    var buffer = new BufferBlock<TOutput>(options); 
    var action = new ActionBlock<TInput>(
     async input => 
     { 
      var output = func(input); 
      await buffer.SendAsync(output); 
     }, options); 

    action.Completion.ContinueWith(
     t => 
     { 
      IDataflowBlock castedBuffer = buffer; 

      if (t.IsFaulted) 
      { 
       castedBuffer.Fault(t.Exception); 
      } 
      else if (t.IsCanceled) 
      { 
       // do nothing: both blocks share options, 
       // which means they also share CancellationToken 
      } 
      else 
      { 
       castedBuffer.Complete(); 
      } 
     }); 

    return DataflowBlock.Encapsulate(action, buffer); 
} 

이 방법은 즉시 순서가 유지되지 의미는 BufferBlock로 이동합니다.

이 코드의 한 가지 문제점은 사실 BoundedCapacity 집합을 준수하지 않는다는 것입니다. 사실상이 블록의 용량은 옵션에서 설정된 용량의 두 배입니다 (두 블록마다 별도의 용량이 있으므로).

+0

감사합니다. 이것은 https://msdn.microsoft.com/en-us/library/hh228606(v=vs.110).aspx에서 예제로 사용되어야합니다. Encapsulate는 데이터 흐름을 변경하는 데 유용하다는 것을 분명히하기 때문에 공장. – nlawalker

+1

2020 년에 EnsureOrdered가 TransformBlock 및 TransformManyBlock을 사용하여 DataflowBlockOptions에 추가되었습니다. 정렬되지 않은 TransformBlock의 경우, EnsureOrdered를 DataflowBlockOptions https://github.com/dotnet/corefx/pull/5191에서 false로 설정합니다. – NPNelson

관련 문제