8

우선 순위 ActionBlock<T>을 구현하고 싶습니다. 그래서 나는 조건부로 Predicate<T>을 사용하여 일부 TInput 항목에 우선 순위를 부여 할 수 있습니다.
나는 Parallel Extensions Extras SamplesGuide to Implementing Custom TPL Dataflow Blocks을 읽었습니다.
그러나 여전히이 시나리오를 어떻게 구현할 수 있는지 파악하지 못합니다.
---------------------------- 편집 ------------------- --------
5 개의 태스크가 동시에 실행될 수있는 태스크가 있습니다. 사용자가 버튼을 누를 때 일부 (술어 기능에 따라 다름) 태스크가 최우선 순위로 실행되어야합니다. 사실
는이 코드를ActionBlock 커스터마이징 <T>

TaskScheduler taskSchedulerHighPriority; 
ActionBlock<CustomObject> actionBlockLow; 
ActionBlock<CustomObject> actionBlockHigh; 
... 
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5); 
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0); 
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1); 
... 
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow }); 
...  
if (predicate(customObject)) 
    actionBlockHigh.Post(customObject); 
else 
    actionBlockLow.Post(customObject); 

쓰기하지만 우선 순위는 전혀 영향을 고려하지 않습니다 보인다.
---------------------------- 편집 ------------------

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow }); 

원인 응용 프로그램이 제대로 작업의 우선 순위를 관찰하지만, 단 하나의 작업이 그 사이에 표시되는 첫 번째 코드 블록을 사용하여 한 번에 실행할 수 있습니다 난 내가 코드 줄을 사용하는 경우는 사실을 발견 흐르는, 응용 프로그램이 동시에 5 개의 작업을 실행하지만 부적절한 우선 순위를 야기합니다.

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow }); 

업데이트 :
탱크 내가 taskSchedulerLow에 대한 MaxMessagesPerTask를 지정해야합니다, svick합니다.

+2

우선 순위를 결정하는 것은 무엇입니까? 'T '와 전혀 관련이없는 것입니까? 아니면 'T'의 고유/파생 된 속성입니까? – casperOne

+0

ConcurrentPriorityQueue를 사용하는 사용자 정의 버퍼 블록을 만들거나 사용자 정의 비동기 transfromation 블록을 생성 할 수 있습니다. 두 옵션 모두 중요하지 않습니다. @casperOne에 동의하십시오. 귀하의 경우 우선 순위는 무엇을 의미합니까? –

답변

7

귀하의 질문에 많은 세부 정보가 포함되어 있지 않으므로 다음은 필요한 사항 일뿐입니다.

이 작업을 수행하는 가장 간단한 방법은 QueuedTaskScheduler from ParallelExtensionsExtras의 다른 우선 순위에서 실행되는 두 개의 ActionBlock을 사용하는 것입니다. 술어를 사용하여 우선 순위가 높은 우선 순위로 연결 한 다음 우선 순위가 낮은 우선 순위로 연결합니다. 또한 우선 순위가 높은 Task이 대기하지 않도록하려면 우선 순위가 낮은 블록의 MaxMessagesPerTask을 설정하십시오.

이것은 당신이 무엇을 할 수 있는지 단지 스케치입니다
static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate) 
{ 
    var buffer = new BufferBlock<T>(); 

    var scheduler = new QueuedTaskScheduler(1); 

    var highPriorityScheduler = scheduler.ActivateNewQueue(0); 
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1); 

    var highPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = highPriorityScheduler 
     }); 
    var lowPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = lowPriorityScheduler, 
      MaxMessagesPerTask = 1 
     }); 

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate); 
    buffer.LinkTo(lowPriorityBlock); 

    return buffer; 
} 

, 예를 들어, 반환 된 블록의 Completion이 제대로 작동하지 않습니다

코드에서는 같이 보일 것입니다.

+0

편집 된 태그 – Rzassar

+1

을 읽으십시오. 코드에서 우선 순위가 낮은 블록에 대해 'MaxMessagesPerTask'를 지정하지 마십시오. 내가 말했듯이, 그렇게하는 것은 아주 중요합니다. – svick