나는 다중 스레드 스프링 응용 프로그램을 가지고 있습니다. 여기서 주제 교환을 작성하고, 대기열을 선언하고, 라우팅 키를 사용하여 바인딩합니다. 메시지를 동 기적으로 보내고받습니다. 주제 교환에 메시지를 보내고 routingKey를 사용하여 메시지를 대기열에 게시 할 수 있는지 확인할 수 있습니다.토끼 MQ 동기 다중 스레드 응용 프로그램에서 보내기 및 받기
그러나 메시지를받는 동안 반복 될 때마다 큐 등록을 얻고 등록이 해제되지 않은 것으로 나타났습니다. 나는 메시지를 수신하기 위해 QueueingConsumer를 만들고 있는데 아마도 같은 것을하는 또 다른 방법이있을 수 있습니다. 알려주세요. 다음은 receiveMessage 메소드의 스 니펫입니다.
public ObjectMessage receiveMessage(final String readQueue, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout, final int readAttempts)
{
this.configurationLock.lock();
this.transmissionSemaphore.release(1);
this.configurationLock.unlock();
try
{
for (int i = 0; i < readAttempts; i++)
{
ObjectMessage returnValue = null;
try
{
returnValue = this.receiveMessage(readQueue, correlationId, isBroadcastMessage, readTimeout);
}
catch (final Exception e)
{
logger.error(e);
}
if (returnValue != null)
{
logger.warn("Message received from queue - " + readQueue);
return returnValue;
}
}
if (correlationId != null)
{
throw new MessageNotFoundException(correlationId);
}
return null;
}
finally
{
try
{
this.transmissionSemaphore.acquire(1);
}
catch (final InterruptedException e)
{
Thread.interrupted();
}
}
}
private ObjectMessage receiveMessage(final String routingKey, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout) throws Exception
{
logger.debug("receiveMessage - routingKey:" + routingKey + ",correlationId:" + correlationId + ",isBroadcastMessage:" + isBroadcastMessage + ",readTimeout:"
+ readTimeout);
this.configurationLock.lock();
this.transmissionSemaphore.release(1);
this.configurationLock.unlock();
Connection connection = null;
Channel channel = null;
QueueingConsumer consumer = null;
try
{
// Binding the topic exchange with queue using routing key
final String queueName = "clientConfigurationQueue";
final CachingConnectionFactory cachingConnectionFactory = this.getCachingConnectionFactory(routingKey);
if (isBroadcastMessage)
{
this.declareTopicAmqpInfrastructure(cachingConnectionFactory, routingKey, queueName);
}
QueueingConsumer.Delivery delivery;
connection = cachingConnectionFactory.createConnection();
channel = connection.createChannel(false);
consumer = new QueueingConsumer(channel);
if (correlationId == null)
{
channel.basicConsume(queueName, true, consumer);
delivery = consumer.nextDelivery(readTimeout);
}
else
{
channel.basicConsume(queueName, false, consumer);
while (true)
{
delivery = consumer.nextDelivery(readTimeout);
if (delivery != null)
{
final String correlationId = delivery.getProperties().getCorrelationId();
if (correlationId.equals(correlationId))
{
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
break;
}
else
{
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}
else
{
break;
}
}
}
ObjectMessage objectMessage = null;
if (delivery != null)
{
logger.debug("Message received with correlationId - " + delivery.getProperties().getCorrelationId() + " for queue - " + queueName);
logger.debug("Message received with Body - " + SerializationUtils.deserialize(delivery.getBody()));
objectMessage = new ObjectMessage();
objectMessage.setCorrelationId(delivery.getProperties().getCorrelationId());
objectMessage.setMessage(delivery.getBody());
}
else
{
logger.debug("Message not received from queueName - " + queueName);
}
return objectMessage;
}
catch (final IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e)
{
logger.error("Unable to receive message - " + e);
throw new Exception(e);
}
finally
{
try
{
this.transmissionSemaphore.acquire(1);
}
catch (final InterruptedException e)
{
Thread.interrupted();
}
try
{
if (connection != null)
{
connection.close();
}
if (channel != null)
{
channel.close();
}
}
catch (final Exception ignore)
{
}
}
}
private void declareTopicAmqpInfrastructure(final CachingConnectionFactory cachingConnectionFactory, final String routingKey, String queueName)
{
final Connection connection = cachingConnectionFactory.createConnection();
final Channel channel = connection.createChannel(false);
try
{
channel.exchangeDeclare("topicExchange", ExchangeTypes.TOPIC, true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, "topicExchange", routingKey);
}
catch (final IOException e)
{
logger.error("Unable to declare rabbit queue, exchange and binding - " + e);
}
finally
{
connection.close();
try
{
channel.close();
}
catch (final IOException ignore)
{
}
}
}
설명하는 것은 나에게 의미가 없습니다. 훨씬 더 자세한 내용을 제공하십시오 (스택 추적 등). 그것을 재현하는 샘플 앱 (예 : Gist)을 게시 할 수 있다면 더 좋을 것입니다. –
샘플 코드 – GRaj