2013-12-20 7 views
1

작업 병렬 라이브러리를 사용하여 다음과 같은 동작을 시도하고 있습니다 :사용 가능한 메시지 일괄 처리를 받으십시오.

메시지가 도착하면서 순차적으로 처리하지만 그룹으로 처리하고 싶습니다. 따라서 첫 번째 메시지가 도착하면 즉시 처리해야합니다. 첫 번째는 후 처리되는 동안이 메시지가 올 경우 그들은 내가 거의 내가 BatchBlockActionBlock

var batchBlock = new BatchBlock<int>(100); 

var actionBlock = new ActionBlock<int[]>(list => 
    { 
     // do work 

     // now trigger 
     batchBlock.TriggerBatch(); 
    }); 

batchBlock.LinkTo(actionBlock); 

문제와 연결하여 원하는 것을 얻을 수 있습니다 2.

의 그룹으로 처리해야한다 위 코드는 아이템이 TriggerBatch() 콜 이후에 도착하면 배치가 채워질 때까지 기다려야한다는 것입니다. 그 대신 각 게시물 다음에 일괄 처리를 실행하면 ActionBlock은 항상 단일 메시지를받습니다.

+0

이 동작이 필요한 이유를 설명해 주시겠습니까? – svick

+0

@svick 데이터베이스 업데이트를 배치하여 데이터베이스 라운드 트립 횟수를 줄이고 싶습니다. 변경 사항이 스트림으로 제공됩니다. – Brownie

답변

0

BatchBlock 대신 BufferBlockTask을 입력하여 해당 항목을 수신하고 논리에 따라 대상에 대해 일괄 적으로 다시 보낼 수 있습니다. 일괄 처리가 포함 된 메시지를 보내야하고 다른 항목이 들어오는 경우이를 취소해야합니다. 대상 블록 (샘플의 actionBlock)은 1로 설정된 BoundedCapacity이어야합니다.

그래서, 먼저 뭔가를받습니다. 그렇게하면 비동기 적으로 전송되기 시작하며 더 많은 항목을 받으려고합니다. 전송이 먼저 완료되면 다시 시작합니다. 수신이 먼저 완료되면 전송을 취소하고 수신 된 항목을 일괄 처리에 추가 한 다음 두 비동기 작업을 다시 시작합니다.

실제 코드는 약간의 코너 사례 (수신 및 송신은 동시에 완료, 송신은 취소 할 수 없음, 수신 완료, 전체가 완료 되었기 때문에)를 처리해야하기 때문에 약간 더 복잡합니다. :

public static ITargetBlock<T> CreateBatchingWrapper<T>(
ITargetBlock<IReadOnlyList<T>> target) 
{ 
    // target should have BoundedCapacity == 1, 
    // but there is no way to check for that 

    var source = new BufferBlock<T>(); 

    Task.Run(() => BatchItems(source, target)); 

    return source; 
} 

private static async Task BatchItems<T>(
    IReceivableSourceBlock<T> source, ITargetBlock<IReadOnlyList<T>> target) 
{ 
    try 
    { 
     while (true) 
     { 
      var messages = new List<T>(); 

      // wait for first message in batch 
      if (!await source.OutputAvailableAsync()) 
      { 
       // source was completed, complete target and return 
       target.Complete(); 
       return; 
      } 

      // receive all there is right now 
      source.ReceiveAllInto(messages); 

      // try sending what we've got 
      var sendCancellation = new CancellationTokenSource(); 
      var sendTask = target.SendAsync(messages, sendCancellation.Token); 

      var outputAvailableTask = source.OutputAvailableAsync(); 

      while (true) 
      { 
       await Task.WhenAny(sendTask, outputAvailableTask); 

       // got another message, try cancelling send 
       if (outputAvailableTask.IsCompleted 
        && outputAvailableTask.Result) 
       { 
        sendCancellation.Cancel(); 

        // cancellation wasn't successful 
        // and the message was received, start another batch 
        if (!await sendTask.EnsureCancelled() && sendTask.Result) 
         break; 

        // send was cancelled, receive messages 
        source.ReceiveAllInto(messages); 

        // restart both Tasks 
        sendCancellation = new CancellationTokenSource(); 
        sendTask = target.SendAsync(
         messages, sendCancellation.Token); 
        outputAvailableTask = source.OutputAvailableAsync(); 
       } 
       else 
       { 
        // we get here in three situations: 
        // 1. send was completed succesfully 
        // 2. send failed 
        // 3. input has completed 
        // in cases 2 and 3, this await is necessary 
        // in case 1, it's harmless 
        await sendTask; 

        break; 
       } 
      } 
     } 
    } 
    catch (Exception e) 
    { 
     source.Fault(e); 
     target.Fault(e); 
    } 
} 

/// <summary> 
/// Returns a Task that completes when the given Task completes. 
/// The Result is true if the Task was cancelled, 
/// and false if it completed successfully. 
/// If the Task was faulted, the returned Task is faulted too. 
/// </summary> 
public static Task<bool> EnsureCancelled(this Task task) 
{ 
    return task.ContinueWith(t => 
    { 
     if (t.IsCanceled) 
      return true; 
     if (t.IsFaulted) 
     { 
      // rethrow the exception 
      ExceptionDispatchInfo.Capture(task.Exception.InnerException) 
       .Throw(); 
     } 

     // completed successfully 
     return false; 
    }); 
} 

public static void ReceiveAllInto<T>(
    this IReceivableSourceBlock<T> source, List<T> targetCollection) 
{ 
    // TryReceiveAll would be best suited for this, except it's bugged 
    // (see http://connect.microsoft.com/VisualStudio/feedback/details/785185) 
    T item; 
    while (source.TryReceive(out item)) 
     targetCollection.Add(item); 
} 
0

타이머를 사용할 수도 있습니다. 트리거를 매 10 초마다 실행합니다.

관련 문제