2017-02-07 5 views
1

Active MQ 게시자 및 구독자를 C#으로 구현하고 있습니다. 나는 브로커와 통신하기 위해 Apache.NMS.ActiveMQ.net 클라이언트 라이브러리를 사용하고 있습니다.장애 조치 설정이있는 ActiveMQ의 승인되지 않은 메시지가 구독자에게 다시 전달되지 않습니다.

<package id="Apache.NMS" version="1.7.1" targetFramework="net461" /> 
    <package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" /> 

의 ActiveMQ가 4 개 서버에 설치 장애로 구성된 (0.225, 0.226, 0.346, 0.347 - 참조를 위해 IP의 마지막 부분). // TCP : //103.24.34.225 : 61616, TCP : //103.24.34.226 : 61616, TCP : //103.24.34.346 : 61616, TCP를 : 브로커 URL은

페일 오버과 같이 보입니다 // 나는이 주제에 가입하고 방법은 다음과

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
    var connectionFactory = new ConnectionFactory(brokerUrl); 
    using (var connection = connectionFactory.CreateConnection("conn1", "conn$34")) 
    { 
     connection.ClientId = "TESTPUBLISHER"; 
     connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge; 
     connection.Start(); 

     var session = connection.CreateSession(); 
     var topic = new ActiveMQTopic("ACCOUNT.UPDATE"); 
     var producer = session.CreateProducer(topic); 


     var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST"; 
     var textMessage = producer.CreateTextMessage(msg); 

     textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE"); 
     textMessage.Properties.SetString("Action", "UPDATE"); 
     textMessage.Properties.SetString("DataContractType", "Account"); 

     producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0)); 

    } 

가 게시하고 방법은 다음과 103.24.34.347:61616

이다. 이 코드는 여러 공유 가입자가 들어오는 메시지를 수신 할 수 있도록 설정되어 있습니다. 가상 토픽을 사용하여이를 수행해야한다고 들었습니다. 따라서 Virtual Topic을 사용하도록 구독자를 구성했으며 Windows Service 프로젝트 내에서 호스팅됩니다. 나는 확인 응답 모드를 사용하여 ClientAcknowledge를 사용하고 있으므로 메시지가 확인되지 않으면 메시지가 계속 되돌아 와야합니다. 아래의 코드 스 니펫은 Windows 서비스의 중요한 가입자 부분을 나타냅니다.

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl)); 
IConnection connection = factory.CreateConnection("conn1", "conn$34")) 

    connection.ClientId = "TESTSUBSCRIBER"; 
    connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge; 
    connection.ConnectionInterruptedListener += OnConnectionInturrupted; 
    connection.ExceptionListener += OnConnectionExceptionListener; 
    connection.ConnectionResumedListener += OnConnectionResumedListener; 
    connection.Start(); 

    ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); 
    var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE"); 
    ActiveMQTopic topic = new ActiveMQTopic(); 
    IMessageConsumer consumer = session.CreateConsumer(queue); 

    consumer.Listener += OnMessage; 



private void OnMessage(IMessage message) 
{ 
    var payload = ((ITextMessage)message).Text; 
    Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]"); 
    if(payload != "43342_test") 
    { 
     message.Acknowledge(); 
      Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]"); 
    } 
} 

private void OnConnectionResumedListener() 
{ 
    Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER"); 
} 

private void OnConnectionExceptionListener(Exception exception) 
{ 
    Log.Error(exception); 
} 

private void OnConnectionInturrupted() 
{ 
    Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER"); 
} 

게시 및 구독자 메시지를 게시 할 수 있습니다. 한 가지 특별한 경우에 문제가 있습니다. 가입자가 장애 조치 (failover) 서버 풀에서 (.225 브로커 서버)에 대한 연결을 설정했다고 가정 해 보겠습니다. 게시자가 메시지를 게시했습니다. 구독자가 수신했으며 처리 중입니다. 그러나 일부 서버 패치 유지 관리로 인해 Windows 서비스는 종료되어야했습니다. 그 결과 브로커에 대한 가입자 연결이 끊어졌습니다. Windows 서비스가 다시 시작되면이 시간 가입자는 페일 오버 풀에서 다른 브로커 서버 (.346 브로커 서버)에 대한 연결을 설정했습니다. 그러한 일이 발생했을 때 미확인 메시지는 결코 다시 전달되지 않았습니다. 그러나 Windows 서비스를 다시 시작한 경우 .225 브로커 (가입자가 처음에 연결된 서버)에 연결되면 이제 구독자는 승인되지 않은 메시지를 수신합니다.

내 가정은 장애 조치 설정에서 ActiveMQ를 구성 할 때 구독자가 연결을 설정하는 장애 조치 (failover) 풀의 브로커 서버와 상관없이 항상 수신 확인되지 않은 메시지를 수신해야한다는 것입니다.

일부 상황에서는 장애 조치 설정이 작동하는 것처럼 보입니다. 구독자가 페일 오버 풀에서 .346 브로커 서버에 연결되었다고 가정합니다. 게시자가 동일한 풀에서 다른 브로커 서버 (.225 브로커)에 연결되어 메시지를 게시하면 구독자가 메시지를 수신합니다. 이것은 Fail over setup이 작동하고 있음을 증명합니다.

그러나 구독자가 브로커 서버에서 메시지를 수신하고 메시지를 확인하기 전에 구독자의 연결이 끊어지면 승인되지 않은 메시지를 수신하기 위해 동일한 브로커 서버에 대한 연결을 다시 설정해야합니다. 이 말이 내게 맞지는 않습니다.

이 사용 사례를 사용하기 위해 Active MQ 서버 설치에 추가 구성이 필요합니까?

답변

0

이 문제점에 대한 해결책은 클라이언트 측이 아니라 Active MQ 서버 구성과 관련됩니다.

생산자 흐름 제어 대상 정책의 경우 ConditionalNetworkBridgeFilter를 추가하고 replayWhenNoConsumers를 활성화하십시오.