7

, 나는 같은 것을 할 싶습니다 :TPL Dataflow를 사용하여 모든 게시물을 취소 한 다음 추가 할 수 있습니까? TPL 데이터 흐름 라이브러리와

myActionBlock.Post(newValue, cancelAllPreviousPosts: true); 

그것은 ActionBlock에 취소 토큰이 모든 일을 취소 것으로 보인다; 내가 그것을 설정하면 나는 새로운 ActionBlock을 만들어야 할 것이다. ActionBlock을 사용하여 부분적으로 취소 할 수 있습니까?

아직 처리되지 않은 게시물은 시도되어서는 안됩니다. 현재 실행중인 게시물을 체크인 할 수있는 취소 토큰이 있으면 좋을 것입니다.

+0

나는이 게시물을 잠시 후에 게시했지만 이후 내 자신의 라이브러리를 만들었습니다. 내 Kts.ActorsLite 라이브러리에 "가장 최근의"작업 큐가 있습니다. https://github.com/BrannonKing/Kts.ActorsLite – Brannon

답변

4

BroadcastBlock<T>에서 가장 최근에 게시 된 항목 만 보유하고 있습니다. ActionBlock<T> 앞에 방송 블록을 놓을 수 있습니다.

브로드 캐스트 블록에 새 항목을 게시해도 작업 블록에서 현재 처리중인 항목이 취소되지는 않지만 브로드 캐스트 블록에서 이미 보유하고있는 기존 항목을 덮어 씁니다. 결과적으로 액션 블록에 의해 아직 처리되지 않은 오래된 메시지는 폐기됩니다. 조치 블록이 현재 항목을 완료하면 가장 최근 항목이 방송 블록에 게시됩니다.

+0

이것은 내가 찾고있는 대답입니다. 감사. – Brannon

0

직접 TPL 데이터 흐름에서이 같이 아무것도 없다,하지만 난 당신이 스스로를 구현할 수있는 방법을 여러 가지 방법으로 볼 수 있습니다 : 당신이로 수정 된 블록을 처리 할 수 ​​있어야 할 필요가없는 경우

  1. 을 일반 데이터 흐름 블록 (예 : LinkTo()에 대한 지원이없는 경우)은 간단한 방법으로 ActionBlock을 감싸는 유형을 작성하지만 해당 항목에도 처리해야하는지 여부를 나타내는 플래그가 포함됩니다. cancelAllPreviousPosts: true을 지정하면 모든 플래그가 재설정되므로 해당 항목은 건너 뜁니다.

    코드는 다음과 같이 보일 수 있습니다 :

    class CancellableActionBlock<T> 
    { 
        private class Item 
        { 
         public T Data { get; private set; } 
         public bool ShouldProcess { get; set; } 
    
         public Item(T data) 
         { 
          Data = data; 
          ShouldProcess = true; 
         } 
        } 
    
        private readonly ActionBlock<Item> actionBlock; 
        private readonly ConcurrentDictionary<Item, bool> itemSet; 
    
        public CancellableActionBlock(Action<T> action) 
        { 
         itemSet = new ConcurrentDictionary<Item, bool>(); 
         actionBlock = new ActionBlock<Item>(item => 
         { 
          bool ignored; 
          itemSet.TryRemove(item, out ignored); 
    
          if (item.ShouldProcess) 
          { 
           action(item.Data); 
          } 
         }); 
        } 
    
        public bool Post(T data, bool cancelAllPreviousPosts = false) 
        { 
         if (cancelAllPreviousPosts) 
         { 
          foreach (var item in itemSet.Keys) 
          { 
           item.ShouldProcess = false; 
          } 
          itemSet.Clear(); 
         } 
    
         var newItem = new Item(data); 
         itemSet.TryAdd(newItem, true); 
         return actionBlock.Post(newItem); 
        } 
    
        // probably other members that wrap actionBlock members, 
        // like Complete() and Completion 
    } 
    
  2. 를 더 작성 가능하고 재사용 가능한 무언가를 작성하려는 경우, 그냥 취소 특별한 블록을 만들 수 있습니다. BufferBlock을 함께 사용하여 구현할 수 있습니다. 세 번째는 용량 1, 두 번째 용량은 무제한입니다. 이렇게하면 대기중인 거의 모든 항목이 두 번째 블록에 있으므로 새 블록을 교체하기 만하면 취소 할 수 있습니다. 전체 구조는 첫 번째와 세 번째 블록에서 Encapsulate()으로 표시됩니다.

    이 접근법의 문제점은 취소가 1 번째 항목 (세 번째 블록에있는 항목)의 지연을 갖는다는 것입니다. 또한, 나는 이것을위한 좋은 인터페이스를 이해하지 못했습니다. 먼로 토마스의 대답에 추가

+0

예제에 대한 노력으로 여기 +50을 수여합니다. 감사. – Brannon

0

여전히 경우에도, BroadcastBlock을 다음 ActionBlock 그것이 BoundedCapacity 1 제한의 요구 사항을 이해하는 것이 중요하다 또는 브로드 캐스트 블록의 모든 메시지를 저장하고 처리 할 것 실행 중.
코드 예제는 여기 간다 :

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber => 
{ 
    await Task.Delay(100); 
    Console.WriteLine($">{ThisNumber}"); 
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null); 
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true }); 

for(int IX = 0; IX < 128; IX++) 
{ 
    await ThrottleBlock.SendAsync(IX); 
    await Task.Delay(10); 
} 

이것은 다음과 같은 결과 :

>0 
>6 
>12 
>20 
>27 
>34 
>41 
>48 
>55 
>62 
>68 
>75 
>82 
>88 
>95 
>101 
>108 
>115 
>122 
>127 

즐기십시오!
- 시몬

관련 문제