2014-03-25 5 views
4

큐에서 (수동으로) 래빗에서 메시지를 이동할 수 있기를 기대합니다. 예를 들어큐 간 메시지 이동 rabbitMQ

:

내가 예를 들어 메시지가 첫 큐에서 'A'로 두 번째 큐를 영입 할 수 있도록하려면
first-queue has messages ['a','b','c','d','e','f'] 
second-queue has messages ['x','y'] 

. 이것은 수동 조작 일 수 있습니다. 두 대기열은 모두 같은 브로커에 있으므로 어떤 교환기를 통해서도 보내지 않으려 고합니다. 이 작업을 수행 할 여지가 있습니까? 나는 rabbitmqctl로 놀았지만 작동시키지 못하고있다. 나는 이것을 달성 할 수있는 다른 도구를 열어두고 있습니다. 결국 나는 일종의 메시지 선택기 (예 : 일부 헤더 필드가있는 모든 메시지 이동 = X를 첫 번째 대기열에서 두 번째 대기열로 이동)를 갖기를 희망합니다.

저는 아직 rabbitmq와 amqp에 익숙하지 않지만 이것을 할 수있는 방법에 대한 문서를 찾을 수 없었습니다 (심지어 가능하다면).

감사합니다.

답변

4

문서화되지 않은 사실은 메시징 모델에서 멀리 떨어져 있기 때문입니다. 예를 들어 tutorial #1를 참조 - -

이 특정 큐에 메시지를 보내 쉽게하지만 그들을 소비에 메시지를 읽을 수있는 유일한 방법은 브로커가 클라이언트에 보내는 순서입니다.

메시지를 큐에서 선택하는 것은 SQL에서 할 수 있습니다. 당신이 할 수있는 일

는 이후로을 게시 재 클라이언트를하도록하는 것입니다 (또는 결국, 플러그인, 그러나 이것은 고급 주제) 몇 가지 규칙에 따라 큐에서 메시지를 사용하고, 대기열 또는 다른 대기열로 이동합니다.

6

@Dax - 난 그냥 여기이 같은 질문에 대답 :

Is it possible to move/merge messages between RabbitMQ queues? 내가 거기에 긴 설명이있다. 중복되는 콘텐츠를 피하기 위해 복사/붙여 넣기를 원하지 않습니다.

당신이 찾고있는 것 같아요 rabbitmq 셔블 플러그인입니다.

그것은 핵심에 내장되어

, 단순히 사용 : 당신은 삽을 만들 수있는 쉬운 인터페이스를 찾을 수있는 GUI의 관리 섹션에서

rabbitmq-plugins enable rabbitmq_shovel 
rabbitmq-plugins enable rabbitmq_shovel_management 

합니다.

나를위한 다른 게시물보기 deets!

public void moveMessages(
      final String sourceQueueName, 
      final String targetQueueName, 
      final String rabbitmqHost, 
      final String rabbitmqUsername, 
      final String rabbitmqPassword, 
      final String rabbitmqVirtualHost 
) throws IOException { 

     // Initialize the consuming and publishing channel 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost(rabbitmqHost); 
     factory.setUsername(rabbitmqUsername); 
     factory.setPassword(rabbitmqPassword); 
     factory.setVirtualHost(rabbitmqVirtualHost); 
     Connection connection = factory.newConnection(); 

     Channel consumingChannel = connection.createChannel(); 
     Channel publishingChannel = connection.createChannel(); 


     while (true) { 
      // Get the first message in the queue (auto ack = false) 
      GetResponse response = consumingChannel.basicGet(sourceQueueName, false); 

      if (response == null) { 
       return; 
      } 

      BasicProperties properties = response.getProps(); 

      // Publish the message to the origin queue 
      publishingChannel.txSelect(); 
      publishingChannel.basicPublish("", targetQueueName, (AMQP.BasicProperties) properties, response.getBody()); 
      publishingChannel.txCommit(); 

      // Acknowledge the message in the dead letter queue 
      consumingChannel.basicAck(response.getEnvelope().getDeliveryTag(), false); 
     } 
    } 
: 여기
2

는 하나의 큐에서 다른 모든 이동하는 간단한 자바 코드