2013-08-22 1 views
1

MQ를 처음 사용하고 RabbitMQ로 로깅 시스템을 구현하려고합니다. 내 구현에는 '보낸 사람'이 포함됩니다.소비자가 듣기 전에 메시지를 보낼 때 소비자가 MQ로부터 메시지를 수신하지 않습니다.

/* 
* This class sends messages over MQ 
*/ 
public class MQSender { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     String[] levels = {"green", "orange", "red", "black"}; 
     for (String log_level : levels) { 
      String message = "This is a " + log_level + " message"; 
      System.out.println("Sending " + log_level + " message"); 
      //publish the message with each of the bindings in levels 
      channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes()); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

내 색상마다 하나의 메시지를 교환기로 보내며, 색상은 바인딩으로 사용됩니다. 그리고 그것은 '수신기'매개 변수로 나는이 교환에서 공급되는하고자하는 색상 바인딩을 나타내는 번호가 부여됩니다

public class MQReceiver { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     receiveMessagesFromQueue(2); 
    } 

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     //generate random queue 
     String queueName = channel.queueDeclare().getQueue(); 

     //set bindings from 0 to maxLevel for the queue 
     for (int level = 0; level <= maxLevel; level++) { 
      channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]); 
     } 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queueName, true, consumer); 

     while(true) { 
      //waits until a message is delivered then gets that message 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      String routingKey = delivery.getEnvelope().getRoutingKey(); 

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); 
     } 
    } 
} 

을 포함한다.

내 구현 및 일반적으로 RabbitMQ에서 Consumer이 메시지를 요청할 때까지 메시지가 교환기에 저장되어있는 것처럼 보입니다. 메시지가 클라이언트의 요청에 따라 각각의 대기열에 배포 된 다음 클라이언트에게 한 번에 하나씩 전송됩니다. (또는 MQ 용어에서의 소비자). 내 문제는 MQReceiver 클래스를 실행하기 전에 MQSender 클래스를 실행할 때 메시지가 배달되지 않는다는 것입니다. 그러나 먼저 MQReceiver 클래스를 실행하면 메시지가 수신됩니다. MQ에 대한 나의 이해에서 나는 메시지가 서버에 저장되어야한다고 생각할 것이다. MQReceiver 클래스가 실행되면 메시지는 소비자에게 전달되어야하지만, 이것은 일어나지 않는다. 가장 중요한 질문은 이러한 메시지를 Exchange에 저장할 수 있는지 여부입니다. 그렇지 않은 경우 소비자 (예 : MQReceiver 클래스)를 호출하면 메시지를 배달 할 수 있도록 어디에 저장해야합니까?

도움 주셔서 감사합니다.

+0

그냥 추측,하지만 난 당신'Sender'가 폐기되는 의심 등록 된 'Consumer's가 부족하여 메시지가 – StormeHawke

+0

어쩌면 autoAck이 true로 설정되어 있습니까 이자형? 더 많은 정보는 여기에 있습니다 : http://www.rabbitmq.com/tutorials/tutorial-two-java.html –

+0

이것 또한 아마도? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –

답변

1

라우팅 키가 Exchange에 바인딩 된 대기열과 일치하지 않으면 RabbitMQ가 메시지를 삭제합니다. 처음에 MQSender을 시작하면 대기열이 바인딩되지 않으므로 전송 된 메시지는 손실됩니다. MQReceiver을 시작하면 대기열을 교환기에 바인드하므로 RabbitMQ에 MQSender의 메시지를 넣을 수 있습니다. 익명 큐를 생성 했으므로 MQReceiver를 중지하면 대기열과 모든 바인딩이 교환기에서 제거됩니다.

MQReceiver이 실행되지 않는 동안 서버에 메시지를 저장하려면 명명 된 큐를 만들고 해당 큐에 라우팅 키를 바인딩 할 수신기가 있어야합니다. 명명 된 대기열을 만드는 것은 멱등수이며 대기열은 이미 존재하는 경우 만들어지지 않습니다. 그런 다음 수신자가 이름이 지정된 대기열에서 메시지를 가져와야합니다.

는 다음과 같이보고 코드를 변경

:

MQSender

.... 
String namedQueue = "logqueue"; 
//declare named queue and bind log level routing keys to it. 
//RabbitMQ will put messages with matching routing keys in this queue 
channel.queueDeclare(namedQueue, false, false, false, null); 
for (int level = 0; level < LOG_LEVELS.length; level++) { 
    channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]); 
} 
... 

MQReceiver

... 
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

QueueingConsumer consumer = new QueueingConsumer(channel); 

//Consume messages off named queue instead of anonymous queue 
String namedQueue = "logqueue"; 
channel.basicConsume(namedQueue, true, consumer); 

while(true) { 
... 
관련 문제