0

나는 spring과 rabbitmq의 통합 상황이 있습니다. 메시지는 대기열로 보내지지만 준비 상태로 끝나고 승인되지 않은 메시지로 보내지지만 소비자는 메시지를 가져 오지 않습니다.spring integration rabbitmq 준비 상태 메시지

들으 XML 구성은 다음과 같이이다 : 서버에서

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/task 
    http://www.springframework.org/schema/task/spring-task.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

<context:property-placeholder location="classpath:spring/prj-rabbitmq-context-thirdparty.properties" ignore-unresolvable="true" order="3"/> 

<rabbit:connection-factory 
     id="prjRabbitmqConnectionFactory" 
     addresses="${rabbitmq.addresses}" 
     username="${rabbitmq.username}" 
     password="${rabbitmq.password}" 
     connection-timeout="5000" /> 

<bean id="rabbitTxManager" 
     class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> 
    <property name="connectionFactory" ref="prjRabbitmqConnectionFactory"/> 
</bean> 

<rabbit:template 
     id="prjRabbitmqTemplate" 
     connection-factory="prjRabbitmqConnectionFactory" 
     message-converter="serializerMessageConverter" 
     retry-template="retryTemplate" /> 

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 
    <property name="backOffPolicy"> 
     <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 
      <property name="initialInterval" value="1000" /> 
      <property name="multiplier" value="3" /> 
      <property name="maxInterval" value="10000" /> 
     </bean> 
    </property> 
</bean> 

<rabbit:admin 
     id="prjRabbitmqAdmin" 
     auto-startup="true" 
     connection-factory="prjRabbitmqConnectionFactory" /> 

<rabbit:queue 
     id="prjSyncQueue" 
     name="${prj.sync.queue}" 
     durable="true"> 
    <rabbit:queue-arguments> 
     <entry key="x-ha-policy" value="all" /> 
    </rabbit:queue-arguments> 
</rabbit:queue> 


<rabbit:listener-container 
     connection-factory="prjRabbitmqConnectionFactory" 
     acknowledge="auto" 
     channel-transacted="true" 
     transaction-manager="rabbitTxManager" 
     task-executor="prjSyncExecutor" 
     concurrency="1" 
     max-concurrency="2" 
     requeue-rejected="true" 
     message-converter="serializerMessageConverter"> 
    <rabbit:listener 
      ref="prjProcessorService" 
      queue-names="${prj.sync.queue}" method="processMessage" /> 
</rabbit:listener-container> 

<task:executor id="prjSyncExecutor" 
       pool-size="${prj.sync.concurrency.min}-${prj.sync.concurrency.max}" 
       keep-alive="${prj.sync.concurrency.keep-alive}" 
       queue-capacity="${prj.sync.concurrency.queue}" 
       rejection-policy="CALLER_RUNS"/> 
<int:channel 
     id="prjChannel" /> 

<int-amqp:outbound-channel-adapter 
     channel="prjChannel" 
     amqp-template="prjRabbitmqTemplate" 
     exchange-name="prjSyncExchange" 
     routing-key="prj-event" 
     default-delivery-mode="PERSISTENT" /> 


<rabbit:direct-exchange 
     name="prjSyncExchange"> 
    <rabbit:bindings> 
     <rabbit:binding 
       queue="prjSyncQueue" 
       key="prj-event" /> 
    </rabbit:bindings> 
</rabbit:direct-exchange> 

<int:gateway 
     id="prjGateway" 
     service-interface="ro.oss.niinoo.thirdparty.prj.gateway.prjEnrichmentGateway"> 
    <int:method 
      name="send" 
      request-channel="prjChannel"/> 
</int:gateway> 

<bean id="prjProcessorService" class="ro.oss.niinoo.thirdparty.prj.processor.impl.prjEnrichmentProcessorImpl" /> 
<bean id="serializerMessageConverter" class="ro.oss.niinoo.thirdparty.prj.serializer.prjSerializer"/> 

은 첫 번째가 선택됩니다 다시 시작하지만, 큐의 다음 호출시 메시지 더미까지. 왜 이런 일이 일어날 지 아십니까?

감사 다니엘

편집 :

소비자 코드 :

public class JsonEnrichmentService implements EnrichmentService { 

@Resource 
private UserQueryService userQueryService; 

@Resource 
private SecurityContextService securityContextService; 

@Override 
public void processMessage(POJO record) { 
    System.out.println(record); 
} 

이 그것에 트랜잭션 주석이있는 새로운 서비스를 호출합니다.

답변

0

내 경험에 비추어 볼 때 일반적으로 리스너 스레드가 사용자 코드 어딘가에 "붙어 있기"때문에 발생합니다. 스레드 덤프를 가져와 리스너 스레드가 수행중인 작업을 확인하십시오.

+0

나는 그것을 가정하고 하나의 System.out.println을 소비자 코드에 넣었지만 아무 것도 변경하지 않았다. –

+0

내가 말했듯이, 당신은 쓰레드 덤프를 볼 필요가있다. –

관련 문제