2014-11-25 3 views
0

나는 RabbitMQ 브로커를 가지고 있는데, Elasticsearch에서 문서로 끝나는 다른 메시지를 게시합니다. 브로커의 여러 소비자가 있습니다. 실제로는 amqp 인바운드 게이트웨이에 할당 된 작업 실행자에서 서로 다른 스레드입니다 (여기서는 스프링 통합과 amqp를 사용).큐 메커니즘 및 Elasticsearch 1.4.0

다음과 같은 시나리오를 생각해보십시오 : 나는 구조

{ 
    "field1" : "value1", 
    "field2" : "value2" 
} 

와 ES의 문서를 만든 이후 나는이 명 업데이트 요청, 같은 필드를 업데이트를 모두 보내, 이제 field1을 가정 해 봅시다. 이 메시지를 다른 하나 (생산의 일반적인 유스 케이스)로 바로 보내면 소비자 스레드가 올바른 순서로 메시지를 가져 오지만 (amqp는 허용 함) 처리가 잘못된 순서로 발생할 수 있으며 나중에 업데이트 된 값이 될 수 있습니다. 첫 번째 것에 의해 덮어 씌여진다. 나는 최종 데이터를 가지고 결국 것입니다.

데이터가 손상되지 않도록하려면 어떻게해야합니까? => 하나의 단일 소비자 스레드 만 있으면 충분하지 않습니다. 소비하는 응용 프로그램과 함께 더 많은 컴퓨터를 추가하여 확장하려는 경우에도 여전히 여러 소비자가있을 것이기 때문입니다. 메시지의 순서가 필요할 수도 있지만 여러 대의 컴퓨터가있을 경우 클러스터 인식 구성 요소를 만들어야 할 필요가있을 것입니다. SI를 사용하고 있기 때문에 제 생각에는 그렇게하기가 어렵습니다.

이전 1.2 버전의 ES에서는 타임 스탬프와 같은 외부 버전을 사용했는데 내 시나리오에는 VersionConflictException이 던졌습니다. 첫 번째 업데이트에는 10000 버전이 있었을 것입니다. 먼저 처리되었으므로 ES는 기존 요청보다 낮은 버전 10000으로 요청을 거부합니다. 하지만 최신 버전에서 업데이트 작업을 수행하는 ES 팀 have removed this functionality.

답변

1

하나의 솔루션은 여러 대기열을 사용하고 각 대기열에 단일 소비자를 배치하는 것일 수 있습니다. 해시 기능을 사용하여 업데이트를 동일한 문서로 항상 동일한 큐로 라우트하십시오. 다양한 옵션에 대해서는 RabbitMQ Tutorials을 참조하십시오.

대기열을 추가하고 해시 기능을 변경하여 수평 확장 할 수 있습니다.

탄력성을 높이려면 소비자를 Spring XD으로 실행하는 것을 고려하십시오. 각 큐에 대해 각 토끼 소스의 단일 인스턴스를 가질 수 있으며 XD는 다른 컨테이너 노드가 다운 될 경우이를 실패로 처리합니다.

auto-startup="false"으로 구성된 웜 대기 - 인바운드 어댑터를 사용하고 모니터가있는 사용자가 직접 롤업하고 <control-bus/>을 사용하여 활성 인스턴스가 다운 된 경우 새 인스턴스를 시작할 수 있습니다.

편집 : 아래의 네 번째 의견에 대한 응답으로

.

위에서 말했듯이 수평 확장하려면 해시 함수를 변경해야합니다. 따라서 실행 중에 자동으로 소비자를 추가하는 것은 까다로울 수 있습니다.

jar에 큐 이름을 하드 코딩 할 필요가 없으며 속성 자리 표시자를 사용하고 속성, 시스템 속성 또는 환경 변수에서 채울 수 있습니다.

이 솔루션은 가장 간단하지만 이러한 제한 사항이 있습니다.

그러나 생산자를 중지하고 모든 대기열이 정지 할 때까지 기다렸다가 소비자를 다시 구성하고 제작자를 다시 시작할 수있는 관리 응용 프로그램을 만들 수 있습니다. - Spring Integration은 어댑터 시작/중지에 <control-bus/>을 제공합니다. JMX를 통해이 작업을 수행 할 수도 있습니다.

대체 솔루션이 가능하지만 일반적으로 (아마도 사육사 등을 사용하여) 클러스터를 통해 공유 상태를 유지해야하므로 훨씬 더 복잡합니다. 여전히은 경쟁 조건을 처리해야합니다 (두 번째 업데이트는 처음 소비자에게 도착할 수 있습니다).

+0

spring-amqp와 함께이 해시 함수를 주입하려면 어떻게해야합니까? 간단한 예를 들어 주시겠습니까? –

+0

문서의 해시를 어떻게 든 계산합니다 (예 : 'customerNumber % 3' (3 큐의 경우)를 사용하여'rabbitTemplate.send ... (...)'메소드에서'routingKey'를 빌드하십시오. –

+0

내가 3 개의 다른 대기열에 게시하고 각각의 대기열에 대해 1 개의 소비자를 등록한다고 가정 해 봅시다. 내 앱을 배포하는 3 대의 컴퓨터가있는 경우 어떻게 하나의 스레드 만 대기열에서 메시지를 가져올 수 있습니까? –

0

일관성 검사에 기본 메커니즘을 사용할 수 있습니다. 기본적으로 업데이트하고있는 버전의 최신 버전을 가지고 있는지 확인하고 싶습니다.

그래서 그 개체에 _version을 가져와야합니다. 쿼리에서 최상위 버전을 version = true로 설정하면됩니다. 그러면 _version이 조회 결과와 함 2 리턴됩니다. 그런 다음 업데이트를 수행 할 때 url의 version 매개 변수를 사용자가 보유한 값으로 설정하기 만하면 일치하지 않는 경우 버전 충돌이 발생합니다.

Nicer는 클로저를 사용한 업데이트를 처리합니다. 기본적으로 이것은 다음과 같이 작동합니다. id로 객체를 가져 오는 update 메소드를 사용하고, 수정하려는 객체를 캡슐화하는 closure (업데이트 함수에 매개 변수)를 적용한 다음 수정 된 객체를 저장합니다. 여전히 가능한 버전 충돌을 잡아 내면 단순히 객체를 다시 가져 와서 객체에 클로저를 다시 적용하면됩니다. 우리는 이것을 수행하고 재시도 전에 임의의 수면을 추가했습니다. 이는 여러 번의 업데이트가 실패 할 확률을 크게 줄이고 좋은 디자인 패턴입니다. 읽기 및 쓰기를 함께 유지하면 충돌 가능성을 최소화하고 잠자기 전에 다시 시도하면 문제가 최소화됩니다. 위험을 줄이기 위해 여러 번 재 시도를 추가 할 수 있습니다.

+0

이것은 모든 업데이트 작업에 대한 호출을 의미합니다. 그리고 동일한 문서에 동시 업데이트가 발생하고 ms 간격으로 발생하는 경우 어떻게해야합니까? 이로 인해 데이터가 손상되지 않습니까? 또 다른 질문 : 동일한 필드에 2 개의 업데이트 요청이 있습니다. 두 번째가 적용된 최신 것이되기를 원할 것입니다. 이 재시도를 통해 첫 번째 업데이트가 마지막으로 적용될 변경 사항이 있습니까? –

+0

동시 업데이트가있는 경우 어떤 경우 든 덮어 쓰기 전에 최신 버전인지 확인해야합니다. 가장 좋은 방법은 PUT하기 바로 전에 GET을하는 것입니다. 우리의 경우, 우리가 그렇게하지 않으면, 우리는 적재 적소에 버전 충돌을 꽤 많이 겪는다. 동시 업데이트가 예상보다 다른 순서로 적용될 가능성이 있습니다. 이 모든 패턴 보장은 1) 버전을 es에서 업데이트하는 것입니다.2) 다른 업데이트가 동시에 작성되지 않습니다. 그것은 낙관적 인 잠금의 한 형태입니다. 물론 종결에 추가 수표를 넣을 수 있습니다. –