내가 진행중인 프로젝트에서 SQS의 메시지를 읽어야하며 Akka를 사용하여 이러한 메시지의 처리를 분산하기로 결정했습니다.Akka, SQS 및 Camel의 소비자 투표율
SQS는 Camel의 지원을 받고 있으며 Consumer 클래스의 Akka에서 사용할 수있는 기능이 내장되어 있으므로 많은 예제를 보지 못했지만이 방법으로 끝점을 구현하고 메시지를 읽는 것이 가장 좋을 것이라고 생각했습니다. 그렇게하는 사람들.
제 문제는 대기열을 빈 채로 또는 비어있는 채로 유지할 수있을만큼 빠르게 대기열을 폴링 할 수 없다는 것입니다. 내가 원래 생각한 것은 소비자가 Camel을 통해 SQS에서 X/s의 속도로 메시지를받을 수 있다는 것입니다. 거기에서 나는 더 많은 소비자를 만들어서 처리 된 메시지가 필요한 비율까지 올릴 수있었습니다.
내 소비자가 :
같이import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
, 나는 메시지의 속도를 개선하기 위해 delay=1
뿐만 아니라 &maxMessagesPerPoll=10
를 설정했지만, 나는 같은 엔드 포인트와 다수의 소비자를 생성 할 수 없습니다입니다.
나는 By default endpoints are assumed not to support multiple consumers.
과 내가이 여러 소비자를 산란하는 나에게 하나 개의 소비를 줄 것 같은 곳에 잠시 시스템을 실행 한 후, 출력 메시지가 대신 Count for actor: x
입니다뿐만 아니라, SQS 엔드 포인트에 대한 사실이 보유하고 생각하는의 문서를 읽기로 다른 출력은 Count for actor: 0
입니다.
이것이 모두 유용한 경우; 단일 소비자에 대한 현재 구현으로 초당 약 33 개의 메시지를 읽을 수 있습니다.
Akka의 SQS 대기열에서 메시지를 읽는 올바른 방법입니까? 그렇다면이 방법을 사용하여 외부로 확장 할 수 있으므로 메시지 사용 속도를 900 메시지/초에 근접 할 수 있습니다.