소비자와 애그리 게이터 (aggregator)에 관한 질문에 대한 답변 :
는 config는 AMQP 및 집계에서 메시지를 사용할 수 있습니다. 집계 전략은 Transction을 기반으로 커밋 :
<amqp:inbound-channel-adapter queue-names="myQueue"
transaction-manager="transactionManager"
channel-transacted="true"
channel="aggregateChannel"
advice-chain="aggregatorReaperAdvice"
concurrent-consumers="4"
tx-size="100"/>
<aggregator input-channel="aggregateChannel" output-channel="storeChannel"
expire-groups-upon-completion="true"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="^[payload.equals(@AGGREGATOR_RELEASE_MARK)] != null"
expression="?[!payload.equals(@AGGREGATOR_RELEASE_MARK)].![payload]"/>
ReaperAdvice
(그루비 코드) :
@Service
class AggregatorReaperAdvice implements MethodBeforeAdvice, InitializingBean {
private static final TRANSACTION_RESOURCE_MARK = 'TRANSACTION_RESOURCE_MARK'
public static final AGGREGATOR_RELEASE_MARK = 'AGGREGATOR_RELEASE_MARK'
MessagingTemplate messagingTemplate
@Autowired
MessageChannel aggregateChannel
@Override
void afterPropertiesSet() throws Exception {
Assert.notNull aggregateChannel, "aggregateChannel must not be null"
messagingTemplate = new MessagingTemplate(aggregateChannel)
}
@Override
void before(Method method, Object[] args, Object target) {
if (!TransactionSynchronizationManager.hasResource(AggregatorReaperAdvice)) {
TransactionSynchronizationManager.bindResource(AggregatorReaperAdvice, TRANSACTION_RESOURCE_MARK)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
void beforeCommit(boolean readOnly) {
messagingTemplate.send(MessageBuilder.withPayload(AGGREGATOR_RELEASE_MARK).build())
}
@Override
void afterCompletion(int status) {
TransactionSynchronizationManager.unbindResource(AggregatorReaperAdvice)
}
})
}
}
}
가 명확하지 않은 경우 알려줘.
다른 모든 질문은 곧 해결 될 것입니다.
manual ack
의 경우 이전의 모든 메시지에 대해 deliveryTag
에 channel.basicAck(deliveryTag, true);
~ ack
을 사용할 수 있습니다. headers["reply_to"]
경우를 들어
... 난 당신이 사용자 정의
AbstractAggregatingMessageGroupProcessor
aggregator
에 대한을 제공하고 두 마리를 죽일해야한다고 생각 :
MessageGroup.getMessages()
이상 애그리 게이터 (aggregator)와 반복의 누적 결과가 제공
MessageChannel
에 대한 응답 과정에 대한 그들 각각을 보낼 수 있습니다. 그것은 귀하의 경우에 대한 빠른 해결책입니다.
유사하지만 좀 더 느슨하게 연결된 솔루션은 어 그리 게이터 및 해당 MessageGroupStore
의 결과를 기반으로합니다. correlationKey
을 추출하여 그룹 및 해당 메시지를 검색하여 원하는 reply
논리를 수행 할 수 있습니다. 이 경우 어 그리 게이터가있는 그룹을 그룹에서 제거하지 말고 그룹 검색 후 수동으로 제거해야합니다.
해결책 인바운드 게이트웨이를 인바운드 및 아웃 바운드 채널 어댑터로 바꿉니다. 인바운드 채널 어댑터를 통해 메시지를 수신하면 체인이 애그리 게이터로 계속됩니다. 애그리 게이터에서 서비스 활성화자가 일괄 처리 메시지를받습니다. 응답 채널에 큰 응답이 추가됩니다. 여기서 큰 메시지를 작은 메시지로 분리하고 수동으로 메시지를 확인해야합니다.모든 reply_to 헤더를 유지하는데도주의를 기울여야합니다. 모든 작은 응답 메시지를 보낸 후 routing-key-expression = headers [ "reply_to"]를 사용하여 아웃 바운드 채널 어댑터에 전달합니다. –
이것이 SI와 spring-amqp로 할 수 있는지 나는 정말로 모른다. 집계 된 메시지의 헤더를 어떻게 든 풍성하게하거나 집계 방식으로 수동으로 만들어야합니다. –