2011-01-06 4 views
20

메시지를 생성하고 고유 대기열로 JMS 대기열을 통해 보내는 JMS 클라이언트가 있습니다.JMS - 하나 이상의 소비자로 이동

내가 원하는 것은 둘 이상의 소비자가 이러한 메시지를받는 것입니다. 내 마음에 오는 첫 번째 일은 주제를 대기열로 변환하는 것입니다. 따라서 현재 및 신규 소비자는 구독하고 모든 사용자에게 동일한 메시지를 전달할 수 있습니다.

이것은 분명히 사물의 생산자와 소비자 측면 모두에서 현재의 클라이언트 코드를 수정하는 것과 관련이 있습니다.

기존 소비자를 수정할 필요가 없도록 두 번째 대기열 만들기와 같은 다른 옵션도 살펴보고 싶습니다. 필자는이 접근법에서 장점이 있다고 생각합니다. (잘못된 것이라면 올바르게 수정하십시오) 하나가 아닌 두 개의 다른 대기열간에로드 균형을 조정하면 성능에 긍정적 인 영향을 미칠 수 있습니다.

본인은 이러한 옵션과 단점에 대해 조언을 드리고 싶습니다. 모든 의견은 높이 평가됩니다.

답변

45

명시된대로 몇 가지 옵션이 있습니다.

동일한 효과를 얻기 위해 주제로 변환하는 경우 소비자에게 지속적인 소비자를 만들어야합니다. 큐가 제공하는 한 가지는 소비자가 살아 있지 않으면 지속성입니다. 이는 사용중인 MQ 시스템에 따라 다릅니다.

큐를 사용하려면 각 소비자에 대한 큐와 원래 큐에서 수신 대기하는 디스패처를 만듭니다. 주제

  • 쉬운의

    Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
                 -> Queue_Consumer_2 <- Consumer_2 
                 -> Queue_Consumer_3 <- Consumer_3 
    

    장점은 동적으로 새로운 소비자를 추가합니다. 모든 소비자는 아무런 작업없이 새로운 메시지를받습니다.

  • Consumer_1에서 메시지, Consumer_2, Consumer_3을 차례로 수신 할 수 있도록 라운드 로빈 주제를 만들 수 있습니다.
  • 대기열을 쿼리하여 반응 적으로 만들지 않고도 새로운 메시지를 푸시 할 수 있습니다. 브로커는이 구성을 지원하지 않는 항목

    • 메시지의

    단점은 지속되지 않습니다. 소비자가 오프라인 상태가되어 다시 돌아 오면 영구 사용자가 설정하지 않으면 메시지를 잃어 버릴 수 있습니다.

  • Consumer_1 및 Consumer_2가 Consumer_3이 아닌 메시지를 수신하는 것을 어렵게 만듭니다.Dispatcher 및 큐의 경우 Dispatcher는 Consumer_3의 큐에 메시지를 넣을 수 없습니다. 소비자는 소비자가 각각의 소비자 큐에 메시지를 배치하지 않음으로써 어떤 메시지를 얻을 수있는 필터링 할 수 있습니다 그들에게
  • 디스패처를 제거 할 때까지 대기열

프로는

  • 메시지는 영구적입니다. 이것은 필터를 통해 주제로 할 수 있습니다. 큐의

단점

  • 추가 큐는 다수의 소비자를 지원하기 위해 생성 될 필요가있다. 동적 환경에서는 이것이 효율적이지 않습니다.

메시징 시스템을 개발할 때 가장 큰 힘을 주지만 이미 대기열을 사용함에 따라 항목을 선호하는 대신 시스템에서 주제를 구현하는 방법을 변경해야합니다. 여러 소비자

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

당신이 문제의 예외 처리 등의 처리를 수행해야합니다 다른 일이있다 명심 소스와

설계 및 큐 시스템의 구현, 연결이 끊어지면 연결과 대기열에 다시 연결하는 등의 작업을 수행 할 수 있습니다. 이는 내가 설명한 내용을 수행하는 방법에 대한 아이디어를 제공하기위한 것입니다.

실제 시스템에서는 아마도 첫 번째 예외에서 빠져 나오지 않을 것입니다. 나는 시스템이 최선의 운영을 계속하고 오류를 기록하도록 허용 할 것이다. 이 코드 에서처럼 단일 소비자 대기열에 메시지를 넣지 못하면 전체 디스패처가 중지됩니다.

Dispatcher.java

/* 
* To change this template, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package stackoverflow_4615895; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Queue; 
import javax.jms.QueueConnection; 
import javax.jms.QueueConnectionFactory; 
import javax.jms.QueueSession; 
import javax.jms.Session; 

public class Dispatcher { 

    private static long QUEUE_WAIT_TIME = 1000; 
    private boolean mStop = false; 
    private QueueConnectionFactory mFactory; 
    private String mSourceQueueName; 
    private String[] mConsumerQueueNames; 

    /** 
    * Create a dispatcher 
    * @param factory 
    *  The QueueConnectionFactory in which new connections, session, and consumers 
    *  will be created. This is needed to ensure the connection is associated 
    *  with the correct thread. 
    * @param source 
    * 
    * @param consumerQueues 
    */ 
    public Dispatcher(
     QueueConnectionFactory factory, 
     String sourceQueue, 
     String[] consumerQueues) { 

     mFactory = factory; 
     mSourceQueueName = sourceQueue; 
     mConsumerQueueNames = consumerQueues; 
    } 

    public void start() { 
     Thread thread = new Thread(new Runnable() { 

      public void run() { 
       Dispatcher.this.run(); 
      } 
     }); 
     thread.setName("Queue Dispatcher"); 
     thread.start(); 
    } 

    public void stop() { 
     mStop = true; 
    } 

    private void run() { 

     QueueConnection connection = null; 
     MessageProducer producer = null; 
     MessageConsumer consumer = null; 
     QueueSession session = null; 
     try { 
      // Setup connection and queues for receiving the messages 
      connection = mFactory.createQueueConnection(); 
      session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
      Queue sourceQueue = session.createQueue(mSourceQueueName); 
      consumer = session.createConsumer(sourceQueue); 

      // Create a null producer allowing us to send messages 
      // to any queue. 
      producer = session.createProducer(null); 

      // Create the destination queues based on the consumer names we 
      // were given. 
      Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; 
      for (int index = 0; index < mConsumerQueueNames.length; ++index) { 
       destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); 
      } 

      connection.start(); 

      while (!mStop) { 

       // Only wait QUEUE_WAIT_TIME in order to give 
       // the dispatcher a chance to see if it should 
       // quit 
       Message m = consumer.receive(QUEUE_WAIT_TIME); 
       if (m == null) { 
        continue; 
       } 

       // Take the message we received and put 
       // it in each of the consumers destination 
       // queues for them to process 
       for (Queue q : destinationQueues) { 
        producer.send(q, m); 
       } 
      } 

     } catch (JMSException ex) { 
      // Do wonderful things here 
     } finally { 
      if (producer != null) { 
       try { 
        producer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (consumer != null) { 
       try { 
        consumer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (session != null) { 
       try { 
        session.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (connection != null) { 
       try { 
        connection.close(); 
       } catch (JMSException ex) { 
       } 
      } 
     } 
    } 
} 

Main.java 좋은 옵션이 될 것

QueueConnectionFactory factory = ...; 

    Dispatcher dispatcher = 
      new Dispatcher(
      factory, 
      "Queue_Original", 
      new String[]{ 
       "Consumer_Queue_1", 
       "Consumer_Queue_2", 
       "Consumer_Queue_3"}); 
    dispatcher.start(); 
+0

+1 좋은 답변입니다. – skaffman

+0

대단한 답변이었습니다. JBoss의 MOM 구현 인 HornetQ를 사용하고 있습니다. –

+0

@Anonimo 마지막으로 JBoss가 JMS 사양을 준수하는지 확인했습니다. 이것은 JMS 스펙이 설명하지 않는 주제를 동적으로 작성하기 때문에 과거에 필자에게 불만을 안겨 주었다. ActiveMQ와 같은 다른 기능을 사용하면 주제를 동적으로 생성 할 수 있으며 동일한 기능을 사용하려면 JBoss에서 한 줄만 변경하면됩니다. –

4

코드를 수정할 필요가 없습니다. 그것은 당신이 그것을 쓴 방법에 달려 있습니다.

예를 들어, 코드가 QueueSender이 아닌 MessageProducer을 사용하여 메시지를 보내는 경우 대기열뿐만 아니라 주제에서도 작동합니다. 마찬가지로 QueueReceiver이 아닌 MessageConsumer을 사용한 경우

기본적으로는 JMS 애플리케이션에서 좋은 연습이 그런 경우, 그 구성의 "단순한"사정 등 MessageProducer, MessageConsumer, Destination, 같이 JMS 시스템과 상호 작용할 수 비 특정 인터페이스를 사용하는 것입니다.

+0

. 불행히도 우리는 QueueSender와 같은 특정 인터페이스를 사용하고 있습니다. 이것은 우리가 리팩토링하는 사람이라면 분명히 염두에 둘 것입니다. –

관련 문제