2014-05-14 8 views
0

가능한 한 빨리 ActiveMQ 대기열에 게시하려는 아주 기본적인 스켈레톤 스칼라 응용 프로그램 (Akka, Camel 및 ActiveMQ 사용)이 있지만 그 대기열에서만 소비합니다 특정 비율 (예 : 초당 1). 여기 Akka, Camel 및 ActiveMQ : 소비자 제한

는 것을 설명하는 일부 코드입니다 : 내 주요 방법에서 MyConsumer.scala

class MyConsumer extends Actor with Consumer { 

    def endpointUri = "activemq:myqueue" 

    def receive = { 
    case msg: CamelMessage => println("Ping!") 
    } 
} 

MyProducer.scala

class Producer extends Actor with Producer with Oneway { 
    def endpointUri = "activemq:myqueue" 
} 

, 나는 그 모든 보일러가 Camel을 설정하고 ActiveMQ와 통화하게하고 나서 :

// Start the consumer 
val consumer = system.actorOf(Props[MyConsumer]) 
val producer = system.actorOf(Props[MyProducer]) 

// Imagine I call this line 100+ times 
producer ! "message" 

MyProducer이 가능한 한 빨리 ActiveMQ로 항목을 전송하도록하려면 어떻게해야합니까? 제한 없음) MyConsumer은 모든 메시지를 x 초 단위로만 읽습니다. 나는 각 메시지가 마지막 순간 (즉, MyConsumer에 의해 읽혀질 때까지) ActiveMQ 큐에 남아 있기를 바란다.

지금까지 나는 특정 비율로 섭취하기 위해 TimerBasedThrottler을 사용했으나, 여전히 모든 메시지를 한 번에 소비합니다.

나는 길을 따라 뭔가를 놓친 경우 사과, 나는 Akka/Camel에 비교적 새로운입니다.

+0

매번 새 제작자를 만들면 안됩니다. ActorRef를 만들고 저장하십시오. – Ryan

+0

감사합니다. Ryan, 좋은 장소 - 이제는 한 번만 인스턴스화하고 있습니다. –

답변

0

"MyConsumer"를 구성하는 소비자는 몇 명입니까?

a) 단지 하나 일 경우 메시지를 읽거나 소비하는 단순한 잠자기가 작동하지 않는 이유가 분명하지 않습니다.

  • 각 소비자가 지정된 소비 속도 스로틀됩니다 : 당신이 필요로하는 동작을 여러 소비자가있는 경우

    . 이 경우 각 소비자 스레드는 여전히 a)에서 설명한대로 동작합니다.

  • 전체적인 소비자 풀이 소비 속도에 대해 조절됩니다. 이 경우 중앙 Throttler는 메시지 간 지연을 유지하고 필요한 지연이 충족 될 때까지 각 소비자를 차단해야합니다. 백 로그가있을 때 관리의 복잡성이있을 수 있습니다 - "추격"을 허용합니다. 당신은 아마 여기에 표류를 얻을 것입니다.

이 질문에서 뭔가 다른 것을 찾고 계셨습니까? 그렇다면 정교하게하십시오.

+0

오직 하나의'MyConsumer'가 있습니다. 그래서'Thread.sleep'을 추가하면됩니다. 그러나 모든 메시지가 즉시 ActiveMQ에서 큐에 저장되지 않는 것으로 나타났습니다. 반면에 초 당 1 개의 큐에서 큐를 대기열에 넣고 싶습니다. –

관련 문제