2017-02-16 3 views
0

나는 질문을 here에 대해 Thread.Run을 사용하여 프로세스를 시작한 것이 예상 한만큼의 동시 요청을 실행하지 않은 이유에 대해 질문했습니다.rabbitmq 메시지 처리 concurrenrtly

이 질문의 이유는 내가 rabbitmq 대기열에서 메시지를 가져 와서 동시에 최대 수의 동시 메시지까지 처리 할 수있는 클래스를 만들려고했기 때문입니다.

이렇게하려면 EventingBasicConsumer 클래스의 Received 처리기에서 다음과 같이 끝났습니다.

async void Handle(EventArgs e) 
{ 
    await _semaphore.WaitAsync(); 

    var thread = new Thread(() => 
    { 
     Process(e); 
     _semaphore.Release(); 
     _channel.BasicAck(....); 
    }); 
    thread.Start(); 
} 

그러나 이전 게시물에 대한 의견은 CPU를 사용하지 않는 한 스레드를 시작하지 않는 것이 었습니다.

위의 처리기는 작업이 CPU 바인딩, 네트워크, 디스크 또는 기타 작업인지 여부를 알지 못합니다. (Process은 추상적 인 방법입니다).

심지어 스레드 또는 작업을 여기에서 시작해야한다고 생각합니다. 그렇지 않으면 Process 메서드가 rabbitmq 스레드를 차단하고 이벤트 처리기가 완료 될 때까지 다시 호출되지 않습니다. 그래서 한 번에 한 가지 방법 만 처리 할 수 ​​있습니다.

여기에서 새 Thread을 시작 하시겠습니까? 원래 나는 Task.Run을 사용 했었지만 이것은 원하는만큼의 작업자를 생산하지 못했습니다. 다른 게시물을 참조하십시오.

참고하시기 바랍니다. 동시 스레드 수는 InitialCount을 세마포어로 설정하여 제한합니다.

답변

0

이미 연결된 질문에서 말한 것처럼 많은 수의 스레드가 성능을 보장하지 못합니다. 즉, 숫자가 논리적 코어 수보다 많아지면 실제 작업을 수행하지 않고 thread starvation 상황이 발생합니다.

그러나 동시 작업 수를 처리해야하는 경우 this tutorial처럼 MaxDegreeOfParallelism까지 설정하여 TPL Dataflow 라이브러리에 시도해 볼 수 있습니다.

var workerBlock = new ActionBlock<EventArgs>(
    // Process event 
    e => Process(e), 
    // Specify a maximum degree of parallelism. 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = InitialCount 
    }); 
var bufferBlock = new BufferBlock(); 
// link the blocks for automatically propagading the messages 
bufferBlock.LinkTo(workerBlock); 

// asynchronously send the message 
await bufferBlock.SendAsync(...); 
// synchronously send the message 
bufferBlock.Post(...); 

은 대기열이므로 메시지 순서가 유지됩니다. 또한, 필터 람다와 블록 연결로 (병렬 처리의 다른 학위를 가진) 다른 핸들러를 추가 할 수 있습니다

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs); 
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs); 
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs); 

그러나 이것을 당신이 설정 체인의 끝에 기본 핸들러를해야하는 경우, 그래서에서

bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>); 

는 또한, 블록이 관측 될 수있다, 그래서 그들은 완벽하게 UI 측면에서 Reactive Extensions 작업 : 메시지 (이에 대한 NullTarget 블록을 사용할 수 있습니다) 사라지지 않을 것입니다.