기본 연결이 닫히면 잠시 후에 자동으로 브로커에 다시 연결되는 소비자를 구현했습니다. 내 경우는 다음과 같습니다.RabbitMQ는 소비자가 다시 연결되기 전에 메시지를 잃습니다.
- RabbitMQ 서버를 성공적으로 시작합니다.
- 소비자를 성공적으로 시작하십시오.
- 메시지 게시 및 소비자가 성공적으로 받았습니다.
정지 RabbitMQ 서버 예외가 표시됩니다 소비자 :
com.rabbitmq.client.ShutdownSignalException : 연결 오류; 이유 : {# 메소드 (응답 코드 = 541, 응답 텍스트 = INTERNAL_ERROR, 클래스 ID = 0, 메소드 ID = 0), null, ""}.
그리고 소비자는 다시 연결하기 전에 60 초 정도 잠자기 상태입니다.
- RabbitMQ 서버를 다시 시작하십시오.
- 'list_queues'명령의 결과는 0
- 입니다. 60 초 후에 소비자는 RabbitMQ에 다시 연결되지만 이제는 6 단계에서 게시 된 메시지가 수신됩니다.
- 소비자가 세 번째 메시지를 게시하면 소비자가 성공적으로이를 받았습니다.
이 경우 다시 연결하기 전에 게시 된 모든 메시지가 손실됩니다. 또 다른 실험을했습니다.
- RabbitMQ를 시작하고 성공적으로 메시지를 게시하십시오 (소비자 프로세스가 시작되지 않음).
- RabbitMQ를 중지 한 다음 다시 시작하십시오.
- 소비자 프로세스를 시작하고 1 단계에서 게시 한 메시지를받습니다.
참고 : 소비자의 QOS는 1입니다.며칠 동안 RabbitMQ를 조사했는데, 소비자는 다시 연결하기 전에 메시지를 게시해야합니다. Pls help (Windows rabbitMQ를 기반으로 테스트를 실행했습니다.) 다음은
은 발행인 :ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();
그리고 소비자는 다음과 같습니다 :
그것은 '주제'교환이므로@Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
final int qos) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
while (true) {
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
// declare a durable, non-exclusive, non-autodelete queue.
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// distribute workload among all consumers, consumer will
// pre-fetch
// {qos}
// messages to local buffer.
channel.basicQos(qos);
logger.debug(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// disable auto-ack. If enable auto-ack, RabbitMQ delivers a
// message to
// the customer it immediately removes it from memory.
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
try {
RabbitMessageConsumer.this.consumeMessage(delivery);
}
catch (Exception e) {
// the exception shouldn't affect the next message
logger.info("[IGNORE]" + e.getMessage());
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
catch (Exception e) {
logger.warn(e);
}
if (autoReconnect) {
this.releaseConn(connection);
logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
+ this.reconnectInterval/1000 + " seconds.");
Thread.sleep(this.getReconnectInterval());
}
else
break;
}
}
private void releaseConn(Connection conn) {
try {
if (conn != null)
conn.close();
}
catch (Exception e) {
// simply ignore this exception
}
}
이 더 큐는 게시자에서 선언되지 않습니다. 그러나 첫 번째 테스트의 3 단계에서 내구성 큐가 선언되었으며 메시지는 내구성도 있습니다. 다시 연결하기 전에 메시지가 손실되는 이유를 이해할 수 없습니다.
RabbitMQ를 다시 시작한 후에 대기열이 있습니까? 메시지를 게시하는 교환기에 연결되어 있습니까? – kzhen
예, 대기열은 내구성이 있습니다. 두 번째 테스트에서는 rabbitMQ 브로커를 다시 시작한 후 소비자가 메시지를 성공적으로 받았습니다. – Ramon