2015-01-16 2 views
9

나는 봄 AMQP의 5.0에서 다음과 같은 시나리오를 테스트하고 있으며 네트워크 중단 후 재 연결을 실패봄 AMQP 5.0 - 네트워크 장애에 토끼 재 연결 문제

  1. 시작 메시지를 사용 봄 응용 프로그램 토끼를 비동기 적으로 사용 : listener-container 및 rabbit : connection-factory (자세한 구성은 다음을 참조).
  2. 로그는 응용 프로그램이 메시지를 성공적으로 수신하고 있음을 나타냅니다. 토끼 서버에 인바운드 네트워크 트래픽을 드롭하여 응용 프로그램에 보이지 않는
  3. 만들기 RabbitMQ : (시간 제한에 대한 네트워크 연결을위한) 최소한 3 분 sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP
  4. 기다립니다.
  5. 연결을 수정하십시오. sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP
  6. 잠시 기다리거나 (한 시간 이상 시도해도) 다시 연결하지 마십시오.
  7. 응용 프로그램을 다시 시작하면 메시지 수신이 다시 시작되어 네트워크가 정상으로 돌아 왔습니다.

또한 iptables 드롭 대신 VM 네트워크 어댑터 연결 끊기와 동일한 시나리오를 테스트했으며 같은 결과가 발생합니다. 즉 자동 재 연결이 필요하지 않습니다. 흥미롭게도 iptables를 시도 할 때 REJECT, DROP 대신 예상대로 작동하며 거부 규칙을 제거하는 즉시 응용 프로그램이 다시 시작되지만 거부는 네트워크 오류보다 서버 오류와 비슷하다고 생각합니다. reference document에 따르면

하십시오 MessageListener를 때문에 비즈니스 예외에 실패 할 경우

는 예외는 메시지 리스너 컨테이너에 의해 처리 한 후 다른 메시지를 듣기로 돌아갑니다. 연결이 끊어 (비즈니스 예외가 아 T) 장애가 발생하면 리스너에 대한 메시지를 수집하는 사용자를 취소하고 다시 시작해야합니다. SimpleMessageListenerContainer는이를 원활하게 처리하며 리스너가 다시 시작되고 있음을 알리는 로그를 남깁니다. 사실 그것은 소비자를 다시 시작하려고 끊임없이 반복합니다. 소비자가 매우 심하게 행동 한 경우에만 포기할 것입니다. 한 가지 부작용은 컨테이너가 시작될 때 브로커가 다운되면 연결이 설정 될 때까지 계속 시도하는 것입니다.

내가 분리 후 분 정도 얻을 로그입니다 :

2015-01-16 14:00:42,433 WARN [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it 
com.rabbitmq.client.ShutdownSignalException: connection error 
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na] 
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na] 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55] 
Caused by: java.io.EOFException: null 
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55] 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na] 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na] 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na] 
    ... 1 common frames omitted 

그리고 몇 초 재 연결 한 후이 로그 메시지가 :

2015-01-16 14:18:14,551 WARN [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out 

UPDATE : 이상하게도 org.springframework.amqp 패키지에 DEBUG 로깅을 활성화하면 재 연결이 성공적으로 수행되어 더 이상 문제를 재현 할 수 없습니다!

디버그 로깅을 사용하지 않으면 스프링 AMQP 코드를 디버깅하려고했습니다. iptables drop이 제거 된 직후, SimpleMessageListenerContainer.doStop() 메서드가 호출되어 shutdown()을 호출하고 모든 채널을 취소하는 것을 관찰했습니다.2

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10) 
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273 
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55] 
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10) 
2015-01-20 15:28:49,283 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273 
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup 
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it. 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55] 
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na] 
    ... 2 common frames omitted 
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer 

UPDATE : 나는 원인과 관련이있을 것으로 보인다 doStop()에 중단 점 넣을 때 나는 또한이 로그 메시지를 받았습니다requested-heartbeat 30 초를 설정 한 후, 답변에 제안을의 재접속은 대부분의 경우에 효과적이었으며 팬 아웃 교환기에 묶여있는 독점적 인 임시 큐를 재정의하는데 성공했지만 여전히 때때로 재접속하지 못합니다. 거의 발생하지는 않지만, 테스트 도중 RabbitMQ 관리 콘솔을 모니터링하여 새 연결이 설정되었지만 (이전 연결이 시간 초과로 제거 된 후) 재 연결 후 독점적 임시 대기열이 재정의되지 않은 것을 관찰했습니다. 또한 클라이언트가 메시지를받지 못했습니다. 덜 자주 발생하므로 문제를 신뢰성있게 재현하는 것이 현재 매우 어렵습니다. 아래에 전체 구성을 제공했으며 이제는 대기열 선언을 포함합니다.

UPDATE 3 : 전용 임시 대기열을 자동 삭제 대기열로 바꾼 후에도 동일한 동작이 가끔 발생합니다. 즉, 재 연결 후 명명 된 대기열 자동 삭제가 다시 정의되지 않고 응용 프로그램이 다시 시작될 때까지 어떠한 메시지도 수신되지 않습니다.

누군가 나를 도와 줄 수 있다면 정말 고맙겠습니다.

<!-- Create a temporary exclusive queue to subscribe to the control exchange --> 
<rabbit:queue id="control-queue"/> 

<!-- Bind the temporary queue to the control exchange --> 
<rabbit:fanout-exchange name="control"> 
    <rabbit:bindings> 
     <rabbit:binding queue="control-queue"/> 
    </rabbit:bindings> 
</rabbit:fanout-exchange> 

<!-- Subscribe to the temporary queue --> 
<rabbit:listener-container connection-factory="connection-factory" 
          acknowledge="none" 
          concurrency="1" 
          prefetch="1"> 
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/> 

</rabbit:listener-container> 

<rabbit:connection-factory id="connection-factory" 
          username="${rabbit.username}" 
          password="${rabbit.password}" 
          host="${rabbit.host}" 
          virtual-host="${rabbit.virtualhost}" 
          publisher-confirms="true" 
          channel-cache-size="100" 
          requested-heartbeat="30" /> 

<rabbit:admin id="admin" connection-factory="connection-factory"/> 

<rabbit:queue id="qu0-id" name="qu0"> 
    <rabbit:queue-arguments> 
     <entry key="x-dead-letter-exchange" value="dead-letter"/> 
    </rabbit:queue-arguments> 
</rabbit:queue> 

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin"> 
    <rabbit:bindings> 
     <rabbit:binding queue="qu0" pattern="p.0"/> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 

<rabbit:listener-container connection-factory="connection-factory" 
          acknowledge="manual" 
          concurrency="4" 
          prefetch="30"> 
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/> 
</rabbit:listener-container> 
+0

가장 초기의 봄 AMQP 버전에는 문제가 없다는 것을 의미하지 않습니까? –

+0

'DEBUG' 레벨의'org.springframework.amqp.rabbit.listener' 카테고리에 대한 로그를 공유 하시겠습니까? BTW 필자는 Windows에서'tcpTrace'와 유사한 (또는 아님?) emilation을 시도해 보았고 로그에서 유사하게'Caused by : java.io.EOFException : null at java.io.DataInputStream.readUnsignedByte'를 보았습니다. 하지만'trace '를 다시 시작하면 연결이 복원됩니다. 내 AMQP 클라이언트는 Spring AMQP로부터의 '3.4.2' 전이 의존성이다. –

+0

Spring AMQP에만 국한되지 않지만 큐와 같은 리소스를 재 연결하고 복구하는 기능이 있다면 나중에 [Lyra] (https://github.com/jhalterman/lyra)를 사용해 볼 수도 있습니다. – Jonathan

답변

4

난 그냥 설명 (패킷을 삭제하는 iptables를 사용하여 리눅스에 토끼)로 테스트를 실행 : 여기

내가에 의존하고있는 스프링 AMQP 구성입니다.

연결이 다시 설정되면 로그가 표시되지 않습니다 (아마도 가능할 것입니다).

다시 연결을 보려면 디버그 로깅을 사용하는 것이 좋습니다.

편집 : rabbitmq 문서에서

:

독점 독점 큐는 현재 연결에 액세스 할 수 있으며, 그 연결이 닫힐 때 삭제됩니다. 다른 연결에 의한 배타적 큐의 패시브 선언은 허용되지 않습니다. 귀하의 예외에서

:

응답 코드 = 405, 답장을 텍스트 = RESOURCE_LOCKED - 가상 호스트에서 'e4288669-2422-40e6-a2ee-b99542509273'잠금 큐에 단독으로 액세스를 얻을 수 없습니다 '/ ', class-id = 50, method-

따라서 브로커는 여전히 다른 연결이 있다고 생각합니다.

  1. 독점적 인 대기열을 사용하지 마십시오 (대기열에있는 메일을 잃어 버리게 됨). 또는
  2. 낮은 requestedHeartbeat으로 설정하면 브로커가 손실 된 연결을 더 빠르게 감지합니다.
+0

Gary에게 감사드립니다. 디버그 로깅을 시도하고 더 많은 정보로 질문을 업데이트했습니다. 다시 연결 한 직후 대기열 다시 선언이 실패하고 SimpleMessageListenerContainer가 종료됩니다. –

+0

나는 대답을 편집했다. 앞으로는 대기열을 포함하여 __all__ 구성을 표시하십시오. –

+0

죄송하지만 다시 연결하지 못하는 문제가 계속 발생하고 있습니다. 세부 사항 및 전체 구성으로 질문을 업데이트했습니다. –

2
우리는 또한뿐만 아니라 우리의 생산 환경에서이 문제를 직면하고있다

때문에 우리가 발견 한 해결 방법은 우리의 클라이언트 응용 프로그램이 지속적으로 재 연결을 계속 시도해야하는 것이 었 등 다른 ESX 랙에 가상 머신으로 실행 토끼 노드의 수 있습니다 클러스터에서 연결이 끊어지면 다음은 우리가 적용되는 설정입니다 그것은 일 :

<util:properties id="spring.amqp.global.properties"> 
    <prop key="smlc.missing.queues.fatal">false</prop> 
</util:properties> 

이 속성은 봄 AMQP의 글로벌 동작을 변경 큐를 선언하는 것은 치명적인 오류 (사용할 수없는 브로커 등)에 대한 실패 할 때. 기본적으로 컨테이너는 3 번만 시도합니다 ("retries left = 0"을 나타내는 로그 메시지 참조).

심판 : http://docs.spring.io/spring-amqp/reference/htmlsingle/#containerAttributes

는 또한, 추가 회복 간격이되도록 용기가 치명적 오류를 복구한다. 그러나 글로벌 동작이 치명적인 오류 (예 : 누락 된 큐)를 다시 시도하는 경우에도 동일한 구성이 사용됩니다.

<rabbit:listener-container recovery-interval="15000" connection-factory="consumerConnectionFactory"> 
.... 
</rabbit:listener-container> 
0

위해 SimpleMessageListenerContainerConnectionFactorysetMissingQueuesFatal(false)로 설정 setRequestedHeartBeat 무기한 연결을 다시 시도합니다. 디폴트에서는, SimpleMessageListenerContainer setMissingQueuesFatal가 true로 설정되어 3 회까지 재 시행됩니다.

@Bean 
    public ConnectionFactory connectionFactory() { 
    final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHost(), getPort()); 
    connectionFactory.setUsername(getUsername()); 
    connectionFactory.setPassword(getPassword()); 
    connectionFactory.setVirtualHost(getVirtualHost()); 
    connectionFactory.setRequestedHeartBeat(30); 
    return connectionFactory; 
    } 

    @Bean 
    public SimpleMessageListenerContainer listenerContainerCopernicusErrorQueue() { 
    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory()); 
    container.setQueueNames(myQueue().getName()); 
    container.setMessageListener(messageListenerAdapterQueue()); 
    container.setDefaultRequeueRejected(false); 
    container.setMissingQueuesFatal(false); 
    return container; 
    }