2016-10-19 3 views
1

ActiveMQ에서 메시지를 가져 오기 위해 비동기 onMessage() 메서드를 사용하여 iam이 응용 프로그램을 작성하고 있습니다. 모든 메시지가 OnMessage() 메서드의 ConcurrentLinkedQueue에 저장되고 thread를 사용하여 ConcurrentLinkedQueue에서 검색 할 수 있도록 activemq에서 1000 개의 메시지가 있다고 가정합니다. 하지만 iam이 직면하는 문제는 iam이 ConcurrentLinkedQueue와의 단일 메시지조차도 추가하거나 검색 할 수 없으며 onMessage()의 textMessage가 textMessage를 사용하는 setter 메서드로 전송되지만 iam은 getter 메서드에서 아무 것도 얻을 수 없습니다. 이유는 무엇입니까 이걸 피하는 법? 내가 그나마ActiveMQ OnMessage() 메서드는 다른 스레드를 차단합니다.

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a producer 
     Thread producer = new Thread(new Producer(queue,settext)); 
     producer.start(); 
//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       //Setting the text message to a setter which takes TextMessage as arg 
       settext.setTextmessage((TextMessage) message); 
      } 
     } 
    } 

//Problem here unable to produce 
class Producer implements Runnable { 

    ConcurrentLinkedQueue<TextMessage> queue; 
    Settext settext; 
    Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){ 
     this.queue = queue2; 
     this.settext=settext; 
    } 

    public void run() { 
     System.out.println("Producer Started"); 
     try { 
      if(this.settext.getTextmessage()!=null) 
      { 
       //Add to ConcurrentLinkedQueue 
       queue.add(this.settext.getTextmessage()); 
      } 
      Thread.currentThread().sleep(200); 
      //} 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while ((str = queue.poll()) != null) { 
      System.out.println("Removed: " + str); 

     } 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     //} 
    } 

답변

2

아래로

코드 조각은 QueueMessageListener가 비동기 적으로 실행되고 있습니다,이 작업을 수행하지만 설계에 문제가 왜, 아래의주의 사항 1 5 참조 알고 그것을 바꿀 수있다 settext.setTextmessage((TextMessage) message); 또 다른 소비자는 끝난 TextMessage를 검색하고이 어쩌면 V2가 더하지만 어쩌면 org.springframework.jms.listener.DefaultMessageListenerContainer 최고의 솔루션 사용을 위해, 큐에 추가하기 전에 :

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a producer 
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point 
     Thread producer = new Thread(new Producer(queue,settext)); 
     producer.start(); 
//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     // 3- you start consumers go to 4, note that you will only consume count messages !! 
     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       //Setting the text message to a setter which takes TextMessage as arg 
       settext.setTextmessage((TextMessage) message); 
       // at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails 
      } 
     } 
    } 

//Problem here unable to produce 
class Producer implements Runnable { 

    ConcurrentLinkedQueue<TextMessage> queue; 
    Settext settext; 
    Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){ 
     this.queue = queue2; 
     this.settext=settext; 
    } 

    public void run() { 
     System.out.println("Producer Started"); 
     try { 
     // 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish 
    // you have to add this   
      while (this.settext.getTextmessage() == null) { 
      try { 
       Thread.currentThread().sleep(500); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
      } 
      if(this.settext.getTextmessage()!=null) 
      { 
       //Add to ConcurrentLinkedQueue 
       queue.add(this.settext.getTextmessage()); 
      } 
      //} 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     // 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish 
    // you have to add this 
     while ((str = queue.poll()) == null) { 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     } 
     System.out.println("Removed: " + str); 
     //} 
    } 

V2 :

,451,515,
public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       queue.add((TextMessage) message); 
      } 
     } 
    } 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while ((str = queue.poll()) == null) { 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     } 
     System.out.println("Removed: " + str); 
     //} 
    } 

V3 :

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       executor.execute(new Consumer((TextMessage) message)); 
      } 
     } 
    } 

//Problem here unable to consume 
class Consumer implements Runnable { 
    TextMessage textMessage; 

    public Consumer(TextMessage textMessage) { 
     this.textMessage = textMessage; 
    } 
    public void run() { 
     System.out.println("Removed: " + str); 
    } 
} 

V4 :

public static void main(String[] args) throws InterruptedException, JMSException { 

    new Consumer(queue).start(); 

    **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

    consumer.setMessageListener(new QueueMessageListener()); 
    executor.shutdown(); 
} 

private static class QueueMessageListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     queue.add((TextMessage) message); 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while (true) { 
      try { 
       Thread.currentThread().sleep(500); 
      } catch (Exception ex) { 
      } 
      while ((str = queue.poll()) == null) { 
       try { 
        Thread.currentThread().sleep(500); 
       } catch (Exception ex) { 
       } 
      } 
      System.out.println("Removed: " + str); 
     } 
    } 
} 
+0

스피 ActiveMQ를 producer.I에서 100 개의 메시지를 보내는 코드의 V2를 시도하지만 난 ("제거"에서 System.out.println + str); 10 번만 인쇄합니다. 즉 큐에서 10 개의 요소 만 제거합니다. 잘못된 경우 –

+0

이 경우 변수 개수 == 10을 의미합니까? –

+0

어쩌면 V3가 더 유연합니다 –

관련 문제