2013-10-08 2 views
5

나는 rabbitMq 큐에서 메시지를 병렬로 처리하려고한다. 대기열은 autoAck = false로 구성됩니다. threadPoolSize 매개 변수를 지원하는 camel endpoints에 대한 camel-rabbitMQ 지원을 사용하고 있지만 원하는 효과가 없습니다. threadpoolsize = 20 인 경우에도 메시지는 여전히 큐에서 연속적으로 처리됩니다.토끼 Mq 자바 클라이언트 병렬 소비

코드를 디버깅 한 결과 threadpoolsize 매개 변수는 here과 같이 토끼 연결 속성에 전달하는 데 사용되는 ExecutorService를 만드는 데 사용됨을 알 수 있습니다. 이 모든 것은 토끼에 들어갈 때까지 좋아 보인다. ConsumerWorkService. 여기서 메시지는 최대 크기 16 개의 메시지 블록으로 처리됩니다. 블록의 각 메시지는 순차적으로 처리 된 후 실행 프로그램을 수행 할 작업이 더 있으면 다음 블록으로 호출됩니다. 이에 대한 코드 스 니펫은 아래와 같습니다. Executor 서비스를 사용하여 메시지를 병렬로 처리하는 방법을 볼 수 없습니다. executorservice는 한 번에 하나의 작업 만 수행 할 수 있습니다.

내가 누락 되었습니까? ConsumerWorkService는 스레드 풀을 사용하는 경우에도

private final class WorkPoolRunnable implements Runnable { 

     public void run() { 
      int size = MAX_RUNNABLE_BLOCK_SIZE; 
      List<Runnable> block = new ArrayList<Runnable>(size); 
      try { 
       Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
       if (key == null) return; // nothing ready to run 
       try { 
        for (Runnable runnable : block) { 
         runnable.run(); 
        } 
       } finally { 
        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
         ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
        } 
       } 
      } catch (RuntimeException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
+0

다른 블록 크기를 사용하도록 ConsumerWorkService를 구성 할 수 있습니까? –

+1

안녕하세요 Claus, 저는 Fergus Nelson으로 github을 통해 Camel-rabbitmq 구성 요소를 약간 변경했습니다. RabbitMqConsumer를 변경하여 필요한 각 동시 소비자에 대한 채널을 설정했습니다. 내가 모든 것을 테스트했을 때 나는 Jira + pull 요청을 만들 것이다. –

+0

@ mR_fr0g, 아시다시피 Camel-RabbitMQ 구성 요소에 여러 채널을 만들어 문제를 해결했습니다. Jira 티켓에 대한 링크를 제공하고, 요청을 받고, 수정본이있는 Camel 버전을 지정할 수 있습니까? – wheleph

답변

3

RabbitMQ의 문서, 이것에 대해 매우 분명하지 않지만,이 풀은 병렬로 메시지를 처리하는 방식으로 사용하지 않는 것 :

각 채널에는 자체 디스패치 스레드가 있습니다. 채널당 한 소비자의 가장 일반적인 사용 사례의 경우 소비자는 다른 소비자를지지하지 않습니다. 한 채널 당 여러 명의 소비자가있는 경우 장기 실행 소비자가 해당 채널의 다른 소비자에게 콜백을 보류 할 수 있음을 알고 있어야합니다.

(http://www.rabbitmq.com/api-guide.html)

이 문서는 단순히 동시성의 필요한 수준만큼 Channel의를 만들 경우, 실제로 쓰레드 당 하나의 Channel 등을 사용하여 제안, 메시지에 연결된 소비자들 사이에 발송됩니다 이 채널들.

2 개의 채널과 소비자로 테스트했습니다. 2 개의 메시지가 대기열에있는 경우 각 소비자는 한 번에 하나의 메시지 만 선택합니다. 언급 한 16 개의 메시지 블록이 간섭하지 않는 것, 이는 좋은 것입니다.

사실, Spring AMQP는 메시지를 동시에 처리 할 수있는 여러 채널을 생성한다. SimpleMessageListenerContainer.setConcurrentConsumers(...) 설정

.

3

Channel 인스턴스가있는 경우 정확하게 ConsumerWorkService을 조사하여 등록 된 소비자를 순차적으로 호출합니다. 이를 극복하기위한 두 가지 방법이 있습니다.

  1. 하나가 아닌 여러 채널을 사용하십시오.
  2. 단일 채널을 사용하지만 소비자를 특별한 방법으로 구현하십시오. 대기열에서 들어오는 메시지를 선택하여 내부 스레드 풀에 작업으로 넣어야합니다.

자세한 내용은 this post에서 확인할 수 있습니다.

관련 문제