나는 봄 AMQP의 5.0에서 다음과 같은 시나리오를 테스트하고 있으며 네트워크 중단 후 재 연결을 실패봄 AMQP 5.0 - 네트워크 장애에 토끼 재 연결 문제
- 시작 메시지를 사용 봄 응용 프로그램 토끼를 비동기 적으로 사용 : listener-container 및 rabbit : connection-factory (자세한 구성은 다음을 참조).
- 로그는 응용 프로그램이 메시지를 성공적으로 수신하고 있음을 나타냅니다. 토끼 서버에 인바운드 네트워크 트래픽을 드롭하여 응용 프로그램에 보이지 않는
- 만들기 RabbitMQ : (시간 제한에 대한 네트워크 연결을위한) 최소한 3 분
sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP
- 기다립니다.
- 연결을 수정하십시오.
sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP
- 잠시 기다리거나 (한 시간 이상 시도해도) 다시 연결하지 마십시오.
- 응용 프로그램을 다시 시작하면 메시지 수신이 다시 시작되어 네트워크가 정상으로 돌아 왔습니다.
또한 iptables 드롭 대신 VM 네트워크 어댑터 연결 끊기와 동일한 시나리오를 테스트했으며 같은 결과가 발생합니다. 즉 자동 재 연결이 필요하지 않습니다. 흥미롭게도 iptables를 시도 할 때 REJECT, DROP 대신 예상대로 작동하며 거부 규칙을 제거하는 즉시 응용 프로그램이 다시 시작되지만 거부는 네트워크 오류보다 서버 오류와 비슷하다고 생각합니다. reference document에 따르면
하십시오 MessageListener를 때문에 비즈니스 예외에 실패 할 경우는 예외는 메시지 리스너 컨테이너에 의해 처리 한 후 다른 메시지를 듣기로 돌아갑니다. 연결이 끊어 (비즈니스 예외가 아 T) 장애가 발생하면 리스너에 대한 메시지를 수집하는 사용자를 취소하고 다시 시작해야합니다. SimpleMessageListenerContainer는이를 원활하게 처리하며 리스너가 다시 시작되고 있음을 알리는 로그를 남깁니다. 사실 그것은 소비자를 다시 시작하려고 끊임없이 반복합니다. 소비자가 매우 심하게 행동 한 경우에만 포기할 것입니다. 한 가지 부작용은 컨테이너가 시작될 때 브로커가 다운되면 연결이 설정 될 때까지 계속 시도하는 것입니다.
이
내가 분리 후 분 정도 얻을 로그입니다 :2015-01-16 14:00:42,433 WARN [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
... 1 common frames omitted
그리고 몇 초 재 연결 한 후이 로그 메시지가 :
2015-01-16 14:18:14,551 WARN [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out
UPDATE : 이상하게도 org.springframework.amqp 패키지에 DEBUG 로깅을 활성화하면 재 연결이 성공적으로 수행되어 더 이상 문제를 재현 할 수 없습니다!
디버그 로깅을 사용하지 않으면 스프링 AMQP 코드를 디버깅하려고했습니다. iptables drop이 제거 된 직후, SimpleMessageListenerContainer.doStop()
메서드가 호출되어 shutdown()을 호출하고 모든 채널을 취소하는 것을 관찰했습니다.2
2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer
UPDATE : 나는 원인과 관련이있을 것으로 보인다 doStop()에 중단 점 넣을 때 나는 또한이 로그 메시지를 받았습니다requested-heartbeat
30 초를 설정 한 후, 답변에 제안을의 재접속은 대부분의 경우에 효과적이었으며 팬 아웃 교환기에 묶여있는 독점적 인 임시 큐를 재정의하는데 성공했지만 여전히 때때로 재접속하지 못합니다. 거의 발생하지는 않지만, 테스트 도중 RabbitMQ 관리 콘솔을 모니터링하여 새 연결이 설정되었지만 (이전 연결이 시간 초과로 제거 된 후) 재 연결 후 독점적 임시 대기열이 재정의되지 않은 것을 관찰했습니다. 또한 클라이언트가 메시지를받지 못했습니다. 덜 자주 발생하므로 문제를 신뢰성있게 재현하는 것이 현재 매우 어렵습니다. 아래에 전체 구성을 제공했으며 이제는 대기열 선언을 포함합니다.
UPDATE 3 : 전용 임시 대기열을 자동 삭제 대기열로 바꾼 후에도 동일한 동작이 가끔 발생합니다. 즉, 재 연결 후 명명 된 대기열 자동 삭제가 다시 정의되지 않고 응용 프로그램이 다시 시작될 때까지 어떠한 메시지도 수신되지 않습니다.
누군가 나를 도와 줄 수 있다면 정말 고맙겠습니다.
<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>
<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
<rabbit:bindings>
<rabbit:binding queue="control-queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="none"
concurrency="1"
prefetch="1">
<rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>
</rabbit:listener-container>
<rabbit:connection-factory id="connection-factory"
username="${rabbit.username}"
password="${rabbit.password}"
host="${rabbit.host}"
virtual-host="${rabbit.virtualhost}"
publisher-confirms="true"
channel-cache-size="100"
requested-heartbeat="30" />
<rabbit:admin id="admin" connection-factory="connection-factory"/>
<rabbit:queue id="qu0-id" name="qu0">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dead-letter"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
<rabbit:bindings>
<rabbit:binding queue="qu0" pattern="p.0"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="manual"
concurrency="4"
prefetch="30">
<rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>
가장 초기의 봄 AMQP 버전에는 문제가 없다는 것을 의미하지 않습니까? –
'DEBUG' 레벨의'org.springframework.amqp.rabbit.listener' 카테고리에 대한 로그를 공유 하시겠습니까? BTW 필자는 Windows에서'tcpTrace'와 유사한 (또는 아님?) emilation을 시도해 보았고 로그에서 유사하게'Caused by : java.io.EOFException : null at java.io.DataInputStream.readUnsignedByte'를 보았습니다. 하지만'trace '를 다시 시작하면 연결이 복원됩니다. 내 AMQP 클라이언트는 Spring AMQP로부터의 '3.4.2' 전이 의존성이다. –
Spring AMQP에만 국한되지 않지만 큐와 같은 리소스를 재 연결하고 복구하는 기능이 있다면 나중에 [Lyra] (https://github.com/jhalterman/lyra)를 사용해 볼 수도 있습니다. – Jonathan