2013-08-09 2 views
2

기본 연결이 닫히면 잠시 후에 자동으로 브로커에 다시 연결되는 소비자를 구현했습니다. 내 경우는 다음과 같습니다.RabbitMQ는 소비자가 다시 연결되기 전에 메시지를 잃습니다.

  1. RabbitMQ 서버를 성공적으로 시작합니다.
  2. 소비자를 성공적으로 시작하십시오.
  3. 메시지 게시 및 소비자가 성공적으로 받았습니다.
  4. 정지 RabbitMQ 서버 예외가 표시됩니다 소비자 :

    com.rabbitmq.client.ShutdownSignalException : 연결 오류; 이유 : {# 메소드 (응답 코드 = 541, 응답 텍스트 = INTERNAL_ERROR, 클래스 ID = 0, 메소드 ID = 0), null, ""}.

    그리고 소비자는 다시 연결하기 전에 60 초 정도 잠자기 상태입니다.

  5. RabbitMQ 서버를 다시 시작하십시오.
  6. 'list_queues'명령의 결과는 0
  7. 입니다. 60 초 후에 소비자는 RabbitMQ에 다시 연결되지만 이제는 6 단계에서 게시 된 메시지가 수신됩니다.
  8. 소비자가 세 번째 메시지를 게시하면 소비자가 성공적으로이를 받았습니다.

이 경우 다시 연결하기 전에 게시 된 모든 메시지가 손실됩니다. 또 다른 실험을했습니다.

  1. RabbitMQ를 시작하고 성공적으로 메시지를 게시하십시오 (소비자 프로세스가 시작되지 않음).
  2. RabbitMQ를 중지 한 다음 다시 시작하십시오.
  3. 소비자 프로세스를 시작하고 1 단계에서 게시 한 메시지를받습니다.

참고 : 소비자의 QOS는 1입니다.며칠 동안 RabbitMQ를 조사했는데, 소비자는 다시 연결하기 전에 메시지를 게시해야합니다. Pls help (Windows rabbitMQ를 기반으로 테스트를 실행했습니다.) 다음은

은 발행인 :

ConnectionFactory factory = new ConnectionFactory(); 
factory.setHost(this.getHost()); 
connection = factory.newConnection(); 
Channel channel = connection.createChannel();    
channel = conn.createChannel(); 
// declare a 'topic' type of exchange 
channel.exchangeDeclare(exchangeName, "topic"); 
// Content-type "application/octet-stream", deliveryMode 2 
// (persistent), priority zero 
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message); 
connection.close(); 

그리고 소비자는 다음과 같습니다 :

그것은 '주제'교환이므로
@Override 
public void consume(final String exchangeName, final String queueName, final String routingKey, 
     final int qos) throws IOException, InterruptedException { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost(this.getHost()); 

    while (true) { 
     Connection connection = null; 
     try { 
      connection = factory.newConnection(); 
      Channel channel = connection.createChannel(); 

      channel.exchangeDeclare(exchangeName, "topic"); 
      // declare a durable, non-exclusive, non-autodelete queue. 
      channel.queueDeclare(queueName, true, false, false, null); 
      channel.queueBind(queueName, exchangeName, routingKey); 
      // distribute workload among all consumers, consumer will 
      // pre-fetch 
      // {qos} 
      // messages to local buffer. 
      channel.basicQos(qos); 

      logger.debug(" [*] Waiting for messages. To exit press CTRL+C"); 

      QueueingConsumer consumer = new QueueingConsumer(channel); 
      // disable auto-ack. If enable auto-ack, RabbitMQ delivers a 
      // message to 
      // the customer it immediately removes it from memory. 
      boolean autoAck = false; 
      channel.basicConsume(queueName, autoAck, consumer); 

      while (true) { 
       QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
       try { 
        RabbitMessageConsumer.this.consumeMessage(delivery); 
       } 
       catch (Exception e) { 
        // the exception shouldn't affect the next message 
        logger.info("[IGNORE]" + e.getMessage()); 
       } 
       channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
      } 
     } 
     catch (Exception e) { 
      logger.warn(e); 
     } 

     if (autoReconnect) { 
      this.releaseConn(connection); 
      logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in " 
        + this.reconnectInterval/1000 + " seconds."); 
      Thread.sleep(this.getReconnectInterval()); 
     } 
     else 
      break; 
    } 
} 

private void releaseConn(Connection conn) { 
    try { 
     if (conn != null) 
      conn.close(); 
    } 
    catch (Exception e) { 
     // simply ignore this exception 
    } 
} 

이 더 큐는 게시자에서 선언되지 않습니다. 그러나 첫 번째 테스트의 3 단계에서 내구성 큐가 선언되었으며 메시지는 내구성도 있습니다. 다시 연결하기 전에 메시지가 손실되는 이유를 이해할 수 없습니다.

+0

RabbitMQ를 다시 시작한 후에 대기열이 있습니까? 메시지를 게시하는 교환기에 연결되어 있습니까? – kzhen

+0

예, 대기열은 내구성이 있습니다. 두 번째 테스트에서는 rabbitMQ 브로커를 다시 시작한 후 소비자가 메시지를 성공적으로 받았습니다. – Ramon

답변

6

게시 할 때 대기열이 없으면 메시지가 손실됩니다. RMQ 서버를 다시 시작한 후에도 메시지를 게시하기 전에 같은 큐에 연결하고 있습니까? 아니면 내구성이 있습니까?솔루션처럼 들리는 것은 중 하나입니다

  1. 만들기 큐 내구성과
  2. 게시하기 전에 큐가 생성되어 있는지를 다시 시작하면 그 큐에 다시 연결합니다.

또한 소비자의 올바른 대기열에 다시 연결해야합니다. 1) 아마 두 솔루션 중 더 낫다.

+0

rabbitmq 서버를 다시 시작한 후 사용자가 다시 연결하기 전의 'list_queues'명령의 결과는 "대기열 나열 ... com.mpos.lottery.te.thirdpartyservice.amqp.RabbitMessagePublisher 0 ... 완료되었습니다." 그것은 소비자가 선언 한 대기열입니다. – Ramon

8

아, 원인을 찾았습니다 ... 메시지와 대기열은 확실히 내구성이 있지만 교환은 내구성이 없습니다. 교환은 내구성이 없으므로 RabbitMQ 브로커를 다시 시작하면 대기열과 교환 간 바인딩 정보가 손실됩니다.

이제 교환을 내구성으로 선언하고 소비자가 다시 시작하기 전에 그리고 브로커를 다시 시작하기 전에 게시 한 메시지를 얻을 수 있습니다.

0

3.1에서 3.3으로 업그레이드하면 내 RabbitMQ 클러스터에서 발생합니다. 그리고 내 해결책은 /var/lib/rabbitmq/mnesia 디렉토리를 제거하는 것입니다.

관련 문제