나는 rabbitMq 큐에서 메시지를 병렬로 처리하려고한다. 대기열은 autoAck = false로 구성됩니다. threadPoolSize 매개 변수를 지원하는 camel endpoints에 대한 camel-rabbitMQ 지원을 사용하고 있지만 원하는 효과가 없습니다. threadpoolsize = 20 인 경우에도 메시지는 여전히 큐에서 연속적으로 처리됩니다.토끼 Mq 자바 클라이언트 병렬 소비
코드를 디버깅 한 결과 threadpoolsize 매개 변수는 here과 같이 토끼 연결 속성에 전달하는 데 사용되는 ExecutorService를 만드는 데 사용됨을 알 수 있습니다. 이 모든 것은 토끼에 들어갈 때까지 좋아 보인다. ConsumerWorkService
. 여기서 메시지는 최대 크기 16 개의 메시지 블록으로 처리됩니다. 블록의 각 메시지는 순차적으로 처리 된 후 실행 프로그램을 수행 할 작업이 더 있으면 다음 블록으로 호출됩니다. 이에 대한 코드 스 니펫은 아래와 같습니다. Executor 서비스를 사용하여 메시지를 병렬로 처리하는 방법을 볼 수 없습니다. executorservice는 한 번에 하나의 작업 만 수행 할 수 있습니다.
내가 누락 되었습니까? ConsumerWorkService
는 스레드 풀을 사용하는 경우에도
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
다른 블록 크기를 사용하도록 ConsumerWorkService를 구성 할 수 있습니까? –
안녕하세요 Claus, 저는 Fergus Nelson으로 github을 통해 Camel-rabbitmq 구성 요소를 약간 변경했습니다. RabbitMqConsumer를 변경하여 필요한 각 동시 소비자에 대한 채널을 설정했습니다. 내가 모든 것을 테스트했을 때 나는 Jira + pull 요청을 만들 것이다. –
@ mR_fr0g, 아시다시피 Camel-RabbitMQ 구성 요소에 여러 채널을 만들어 문제를 해결했습니다. Jira 티켓에 대한 링크를 제공하고, 요청을 받고, 수정본이있는 Camel 버전을 지정할 수 있습니까? – wheleph