2014-06-18 1 views
0

나는 다중 스레드 스프링 응용 프로그램을 가지고 있습니다. 여기서 주제 교환을 작성하고, 대기열을 선언하고, 라우팅 키를 사용하여 바인딩합니다. 메시지를 동 기적으로 보내고받습니다. 주제 교환에 메시지를 보내고 routingKey를 사용하여 메시지를 대기열에 게시 할 수 있는지 확인할 수 있습니다.토끼 MQ 동기 다중 스레드 응용 프로그램에서 보내기 및 받기

그러나 메시지를받는 동안 반복 될 때마다 큐 등록을 얻고 등록이 해제되지 않은 것으로 나타났습니다. 나는 메시지를 수신하기 위해 QueueingConsumer를 만들고 있는데 아마도 같은 것을하는 또 다른 방법이있을 수 있습니다. 알려주세요. 다음은 receiveMessage 메소드의 스 니펫입니다.

public ObjectMessage receiveMessage(final String readQueue, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout, final int readAttempts) 
{ 
    this.configurationLock.lock(); 
    this.transmissionSemaphore.release(1); 
    this.configurationLock.unlock(); 
    try 
    { 
     for (int i = 0; i < readAttempts; i++) 
     { 
      ObjectMessage returnValue = null; 
      try 
      { 
       returnValue = this.receiveMessage(readQueue, correlationId, isBroadcastMessage, readTimeout); 
      } 
      catch (final Exception e) 
      { 
       logger.error(e); 
      } 
      if (returnValue != null) 
      { 
       logger.warn("Message received from queue - " + readQueue); 
       return returnValue; 
      } 
     } 
     if (correlationId != null) 
     { 
      throw new MessageNotFoundException(correlationId); 
     } 
     return null; 
    } 
    finally 
    { 
     try 
     { 
      this.transmissionSemaphore.acquire(1); 
     } 
     catch (final InterruptedException e) 
     { 
      Thread.interrupted(); 
     } 
    } 
} 


private ObjectMessage receiveMessage(final String routingKey, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout) throws Exception 
{ 
    logger.debug("receiveMessage - routingKey:" + routingKey + ",correlationId:" + correlationId + ",isBroadcastMessage:" + isBroadcastMessage + ",readTimeout:" 
      + readTimeout); 
    this.configurationLock.lock(); 
    this.transmissionSemaphore.release(1); 
    this.configurationLock.unlock(); 

    Connection connection = null; 
    Channel channel = null; 
    QueueingConsumer consumer = null; 
    try 
    { 
     // Binding the topic exchange with queue using routing key 
     final String queueName = "clientConfigurationQueue"; 
     final CachingConnectionFactory cachingConnectionFactory = this.getCachingConnectionFactory(routingKey); 
     if (isBroadcastMessage) 
     { 
      this.declareTopicAmqpInfrastructure(cachingConnectionFactory, routingKey, queueName); 
     } 
     QueueingConsumer.Delivery delivery; 

     connection = cachingConnectionFactory.createConnection(); 
     channel = connection.createChannel(false); 

     consumer = new QueueingConsumer(channel); 

     if (correlationId == null) 
     { 
      channel.basicConsume(queueName, true, consumer); 
      delivery = consumer.nextDelivery(readTimeout); 
     } 
     else 
     { 
      channel.basicConsume(queueName, false, consumer); 
      while (true) 
      { 
       delivery = consumer.nextDelivery(readTimeout); 
       if (delivery != null) 
       { 
        final String correlationId = delivery.getProperties().getCorrelationId(); 

        if (correlationId.equals(correlationId)) 
        { 
         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
         break; 
        } 
        else 
        { 
         channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 
        } 
       } 
       else 
       { 
        break; 
       } 
      } 
     } 

     ObjectMessage objectMessage = null; 
     if (delivery != null) 
     { 
      logger.debug("Message received with correlationId - " + delivery.getProperties().getCorrelationId() + " for queue - " + queueName); 
      logger.debug("Message received with Body - " + SerializationUtils.deserialize(delivery.getBody())); 
      objectMessage = new ObjectMessage(); 
      objectMessage.setCorrelationId(delivery.getProperties().getCorrelationId()); 
      objectMessage.setMessage(delivery.getBody()); 
     } 
     else 
     { 
      logger.debug("Message not received from queueName - " + queueName); 
     } 

     return objectMessage; 
    } 
    catch (final IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e) 
    { 
     logger.error("Unable to receive message - " + e); 
     throw new Exception(e); 
    } 
    finally 
    { 
     try 
     { 
      this.transmissionSemaphore.acquire(1); 
     } 
     catch (final InterruptedException e) 
     { 
      Thread.interrupted(); 
     } 

     try 
     { 
      if (connection != null) 
      { 
       connection.close(); 
      } 

      if (channel != null) 
      { 
       channel.close(); 
      } 
     } 
     catch (final Exception ignore) 
     { 

     } 
    } 
} 

private void declareTopicAmqpInfrastructure(final CachingConnectionFactory cachingConnectionFactory, final String routingKey, String queueName) 
{ 
    final Connection connection = cachingConnectionFactory.createConnection(); 
    final Channel channel = connection.createChannel(false); 
    try 
    { 
     channel.exchangeDeclare("topicExchange", ExchangeTypes.TOPIC, true, false, null); 
     channel.queueDeclare(queueName, true, false, false, null); 
     channel.queueBind(queueName, "topicExchange", routingKey); 
    } 
    catch (final IOException e) 
    { 
     logger.error("Unable to declare rabbit queue, exchange and binding - " + e); 
    } 
    finally 
    { 
     connection.close(); 
     try 
     { 
      channel.close(); 
     } 
     catch (final IOException ignore) 
     { 

     } 
    } 
} 
+1

설명하는 것은 나에게 의미가 없습니다. 훨씬 더 자세한 내용을 제공하십시오 (스택 추적 등). 그것을 재현하는 샘플 앱 (예 : Gist)을 게시 할 수 있다면 더 좋을 것입니다. –

+0

샘플 코드 – GRaj

답변

0

편집을 완료하면 질문이 완전히 변경되었습니다. 원래의 질문은 당신이 createConnection()에 매달려 있다고 대답했습니다. Spring AMQP를 사용한다면, 더 높은 레벨의 추상화를 사용하지 않는 것이 어떻겠습니까? 소비자를 취소 한 적이 없습니다. 소비자 태그가 basicConsume 인 것을 확인하고 완료하면 basicCancel으로 취소해야합니다.

+0

에 대한 자세한 내용을 제공합니다. 주제와 팬 아웃 교환과 관련된 AMQP 개념을 거의 이해하지 못하고 혼란 스러울만큼 혼란 스럽기 때문에 질문을 완전히 변경하는 것에 대해 죄송합니다. 더 높은 수준의 추상화로 RabbitTemplate을 사용하여 메서드를 보내고 받음을 의미합니까? – GRaj

+0

예, async 용 메시지 수신기 컨테이너가 수신합니다. –

+0

소비자 목록이 즉시 삭제되었습니다. 게리 고맙습니다. 당신은 훌륭한 스승이었습니다. – GRaj

관련 문제