0

메시지를 읽고 Elasticsearch에서이 데이터를 인덱싱하는 일부 RabbitMQ 대기열에 소비자가 있습니다. 구현은 spring-amqp를 사용하여 수행됩니다. 우리는 성능을 향상시키기 위해 소비자 수준에서 메시지를 집계하고 Elasticsearch에서 벌크 삽입을 수행 할 계획입니다 (이로 인해 성능이 향상 될 것입니다).봄 AMQP 및 Elasticsearch - 메시지 집계

구현 방법에 대해 궁금한 점이 있습니까? 또한 민감한 또 다른 문제는 응답을 처리하는 방법입니다. 각 메시지는 "reply_to"헤더를 가지며 응답 채널과 함께 인바운드 게이트웨이를 사용하므로 각 메시지에 대해 응답을 전달해야합니다.

일괄 처리 크기와 MessageGroupStore가 만료되는 기간 (그리고 물론 사신)에 기반한 릴리스 전략을 사용하여 스프링 통합의 애그리 게이터를 사용할 생각입니다. 인바운드 게이트웨이에는 20 개의 작업 실행자와 20 개의 프리 페치 카운트가 있습니다. 요청이 올 때마다 메시지가 그룹 저장소에 추가되고 canRelease() 조건이 정상일 때 요청과 함께 제공된 스레드 중 하나가 대량 작업을 수행합니다. 그러나 내가 결코 쓰지 않을 응답을 기다려야하는 다른 스레드들과 어떻게 할 것인가. 또한 크고 집계 된 메시지에 대한 응답을 중단하는 방법을 알지 못하므로 각각의 작은 요청마다 응답이 제공됩니다.

또 다른 문제는 어떻게 메시지를 확인합니까? 내가 읽은 트랜잭션에서 RabbitMQ의 성능이 저하 될 것이므로 "tx-size"특성을 사용하는 것을 기쁘게 생각하지 않습니다. 또한 시간 초과가 너무 작 으면이 속성이 잘못 계산 될 수 있습니다.

+1

해결책 인바운드 게이트웨이를 인바운드 및 아웃 바운드 채널 어댑터로 바꿉니다. 인바운드 채널 어댑터를 통해 메시지를 수신하면 체인이 애그리 게이터로 계속됩니다. 애그리 게이터에서 서비스 활성화자가 일괄 처리 메시지를받습니다. 응답 채널에 큰 응답이 추가됩니다. 여기서 큰 메시지를 작은 메시지로 분리하고 수동으로 메시지를 확인해야합니다.모든 reply_to 헤더를 유지하는데도주의를 기울여야합니다. 모든 작은 응답 메시지를 보낸 후 routing-key-expression = headers [ "reply_to"]를 사용하여 아웃 바운드 채널 어댑터에 전달합니다. –

+0

이것이 SI와 spring-amqp로 할 수 있는지 나는 정말로 모른다. 집계 된 메시지의 헤더를 어떻게 든 풍성하게하거나 집계 방식으로 수동으로 만들어야합니다. –

답변

1

소비자와 애그리 게이터 (aggregator)에 관한 질문에 대한 답변 :

는 config는 AMQP 및 집계에서 메시지를 사용할 수 있습니다. 집계 전략은 Transction을 기반으로 커밋 :

<amqp:inbound-channel-adapter queue-names="myQueue" 
            transaction-manager="transactionManager" 
            channel-transacted="true" 
            channel="aggregateChannel" 
            advice-chain="aggregatorReaperAdvice" 
            concurrent-consumers="4" 
            tx-size="100"/> 

<aggregator input-channel="aggregateChannel" output-channel="storeChannel" 
       expire-groups-upon-completion="true" 
       correlation-strategy-expression="T(Thread).currentThread().id" 
       release-strategy-expression="^[payload.equals(@AGGREGATOR_RELEASE_MARK)] != null" 
       expression="?[!payload.equals(@AGGREGATOR_RELEASE_MARK)].![payload]"/> 

ReaperAdvice (그루비 코드) :

@Service 
class AggregatorReaperAdvice implements MethodBeforeAdvice, InitializingBean { 

    private static final TRANSACTION_RESOURCE_MARK = 'TRANSACTION_RESOURCE_MARK' 

    public static final AGGREGATOR_RELEASE_MARK = 'AGGREGATOR_RELEASE_MARK' 

    MessagingTemplate messagingTemplate 

    @Autowired 
    MessageChannel aggregateChannel 

    @Override 
    void afterPropertiesSet() throws Exception { 
     Assert.notNull aggregateChannel, "aggregateChannel must not be null" 
     messagingTemplate = new MessagingTemplate(aggregateChannel) 
    } 

    @Override 
    void before(Method method, Object[] args, Object target) { 
     if (!TransactionSynchronizationManager.hasResource(AggregatorReaperAdvice)) { 
      TransactionSynchronizationManager.bindResource(AggregatorReaperAdvice, TRANSACTION_RESOURCE_MARK) 
      TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { 

       @Override 
       void beforeCommit(boolean readOnly) { 
        messagingTemplate.send(MessageBuilder.withPayload(AGGREGATOR_RELEASE_MARK).build()) 
       } 

       @Override 
       void afterCompletion(int status) { 
        TransactionSynchronizationManager.unbindResource(AggregatorReaperAdvice) 
       } 

      }) 
     } 
    } 
} 

가 명확하지 않은 경우 알려줘.

다른 모든 질문은 곧 해결 될 것입니다.

manual ack의 경우 이전의 모든 메시지에 대해 deliveryTagchannel.basicAck(deliveryTag, true); ~ ack을 사용할 수 있습니다. headers["reply_to"] 경우를 들어

... 난 당신이 사용자 정의 AbstractAggregatingMessageGroupProcessor aggregator에 대한을 제공하고 두 마리를 죽일해야한다고 생각 : MessageGroup.getMessages() 이상 애그리 게이터 (aggregator)와 반복의 누적 결과가 제공 MessageChannel에 대한 응답 과정에 대한 그들 각각을 보낼 수 있습니다. 그것은 귀하의 경우에 대한 빠른 해결책입니다.

유사하지만 좀 더 느슨하게 연결된 솔루션은 어 그리 게이터 및 해당 MessageGroupStore의 결과를 기반으로합니다. correlationKey을 추출하여 그룹 및 해당 메시지를 검색하여 원하는 reply 논리를 수행 할 수 있습니다. 이 경우 어 그리 게이터가있는 그룹을 그룹에서 제거하지 말고 그룹 검색 후 수동으로 제거해야합니다.

+0

안녕 Artem. 응답 해주셔서 감사합니다. 따라서 제 생각은 제가 이해하는 것으로부터 당신에게 잘 어울립니다. d. 당신이 정확하게 언급했듯이, AggregatorReaperAdvice 코드는 나에게 분명하지 않습니다. 여기서 거래 지점을 설명해 주시겠습니까? 다음과 같은 시나리오 때문에 tx 크기를 설정하는 것을 좋아하지 않습니다. prefetch-count = tx-size = 100 (prefetch0count

+0

음, 내 AggregatorReaperAdvice가 Listener에 적용됩니다. '프로세스를 거치면 TX 내에있게됩니다. 그것은 TX와 연결된'TXSync'를 등록하고 인공 수집기'release'는'MARK' 메시지를 보냅니다. 이것을 갖는 것은 정확히'tx-size' 메시지 이하를 TX 내에서 모은 것입니다. RAM에 대해 걱정하기 때문에 'tx-size'와 'concurency'로 게임을 할 수 있습니다. 그래서, 당신이 허용하는 것보다 더 많은 청취자 프로세스가 없을 것입니다. 리스너 스레드에서 다른 다운 스트림 흐름을 수행하여 스레드를 차단하고 새 리스너 태스크를 시작할 수 없도록해야합니다. –

+0

끈질 기게해서 죄송합니다.하지만 3 가지 더 질문이 있습니다. 1) 스레드 ID를 기반으로 상관 전략을 갖는 이점은 무엇입니까? 4 명의 소비자가 있으면 _SimpleMessageGroup_이 1 개인 _SimpleMessageStore_를 갖게됩니다. 2) 예외가 발생하면 트랜잭션이 롤백됩니다. 서비스 액티베이터로 errorchannel을 구성한 경우 (우리는 항상 예외가 발생했음을 나타내는 응답을 반환하지만 메시지를 다시 전달하지 않으려는 경우) 여기에도 여전히 failedMessage가 수신됩니까? 나는 메시지가 재구성 될 것이라고 생각한다. –