2013-11-04 2 views
6

내가 진행중인 프로젝트에서 SQS의 메시지를 읽어야하며 Akka를 사용하여 이러한 메시지의 처리를 분산하기로 결정했습니다.Akka, SQS 및 Camel의 소비자 투표율

SQS는 Camel의 지원을 받고 있으며 Consumer 클래스의 Akka에서 사용할 수있는 기능이 내장되어 있으므로 많은 예제를 보지 못했지만이 방법으로 끝점을 구현하고 메시지를 읽는 것이 가장 좋을 것이라고 생각했습니다. 그렇게하는 사람들.

제 문제는 대기열을 빈 채로 또는 비어있는 채로 유지할 수있을만큼 빠르게 대기열을 폴링 할 수 없다는 것입니다. 내가 원래 생각한 것은 소비자가 Camel을 통해 SQS에서 X/s의 속도로 메시지를받을 수 있다는 것입니다. 거기에서 나는 더 많은 소비자를 만들어서 처리 된 메시지가 필요한 비율까지 올릴 수있었습니다.

내 소비자가 :

같이
import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

, 나는 메시지의 속도를 개선하기 위해 delay=1뿐만 아니라 &maxMessagesPerPoll=10를 설정했지만, 나는 같은 엔드 포인트와 다수의 소비자를 생성 할 수 없습니다입니다.

나는 By default endpoints are assumed not to support multiple consumers.과 내가이 여러 소비자를 산란하는 나에게 하나 개의 소비를 줄 것 같은 곳에 잠시 시스템을 실행 한 후, 출력 메시지가 대신 Count for actor: x입니다뿐만 아니라, SQS 엔드 포인트에 대한 사실이 보유하고 생각하는의 문서를 읽기로 다른 출력은 Count for actor: 0입니다.

이것이 모두 유용한 경우; 단일 소비자에 대한 현재 구현으로 초당 약 33 개의 메시지를 읽을 수 있습니다.

Akka의 SQS 대기열에서 메시지를 읽는 올바른 방법입니까? 그렇다면이 방법을 사용하여 외부로 확장 할 수 있으므로 메시지 사용 속도를 900 메시지/초에 근접 할 수 있습니다.

답변

5

Sadly Camel은 현재 SQS에서 메시지의 병렬 소비를 지원하지 않습니다.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

이 내가 AWS-자바 SDK를 사용하여 배치 메시지 SQS를 폴링하기 위해 내 자신의 배우를 작성했습니다 해결하기 위해. 나는 이것이 최선의 구현 인 경우 확실하지 않다, 그리고 그것은 물론 요청이 지속적으로 비어있는 큐를 치는되지 않습니다, 그것은 폴링 할 수있는 내 현재의 요구에 맞게 않도록에 따라 개선 될 수 있지만

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

동일한 대기열의 메시지

이것으로 SQS에서 약 133 (메시지/초)/배우를 얻는 중입니다.

1

낙타 2.15는 concurrentConsumers를 지원합니다. 단, akka camel 2.15 지원 여부와 여러 소비자가 있더라도 하나의 소비자 액터가 있으면 차이가 있는지 모릅니다.