메시지 대기열 (Amazon SQS)에서 항목을 소비하는 에이전트를 사용하는 가장 좋은 방법을 알아 내려고하고 있습니다. 지금은 대기열의 항목을 잡고 처리하는 함수 (process-queue-item)가 있습니다.대기열에서 소모하는 Clojure 에이전트
이러한 항목을 동시에 처리하려고하지만 상담원을 제어하는 방법에 대해 머리를 감쌀 수 없습니다. 기본적으로 큐에서 많은 항목을 가져 오지 않고 백 로그를 개발하지 않고 가능한 한 많은 에이전트를 모두 사용하기를 원합니다. (두 대의 컴퓨터에서 실행되도록 할 것이므로 항목을 대기열에 둘 필요가 있습니다. 정말 필요합니다).
내 구현을 개선하는 데 도움이 될만한 사람이 있습니까?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
이 같은 메시지 큐를 치료할 수있는 몇 가지 방법이 있나요 seq 그리고 나서 pmap을 사용하여 병렬화를 얻으시겠습니까? –
@Alex Stoddard : 제 경우에는 process-queue-item이 실제로 네트워크 IO에서 차단되므로, 머신에 코어가있는만큼 많은 스레드를 사용하기 때문에 pmap이 올바른 선택이라고 생각하지 않습니다. – erikcw
@erikw : 물론 이죠,하지만 이것은 단지 pmap 구현의 세부 사항입니다 (threads = #cores + 2). 매개 변수화 된 수의 스레드를 사용하여 pmap 버전을 작성할 수 없습니다.pmap 소스의 첫 번째 줄을 봅니다. (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) –