2010-04-08 3 views
26

메시지 대기열 (Amazon SQS)에서 항목을 소비하는 에이전트를 사용하는 가장 좋은 방법을 알아 내려고하고 있습니다. 지금은 대기열의 항목을 잡고 처리하는 함수 (process-queue-item)가 있습니다.대기열에서 소모하는 Clojure 에이전트

이러한 항목을 동시에 처리하려고하지만 상담원을 제어하는 ​​방법에 대해 머리를 감쌀 수 없습니다. 기본적으로 큐에서 많은 항목을 가져 오지 않고 백 로그를 개발하지 않고 가능한 한 많은 에이전트를 모두 사용하기를 원합니다. (두 대의 컴퓨터에서 실행되도록 할 것이므로 항목을 대기열에 둘 필요가 있습니다. 정말 필요합니다).

내 구현을 개선하는 데 도움이 될만한 사람이 있습니까?

(def active-agents (ref 0)) 

(defn process-queue-item [_] 
    (dosync (alter active-agents inc)) 
    ;retrieve item from Message Queue (Amazon SQS) and process 
    (dosync (alter active-agents dec))) 

(defn -main [] 
    (def agents (for [x (range 20)] (agent x))) 

    (loop [loop-count 0] 

    (if (< @active-agents 20) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent)) 
     ;should skip this agent until later if it is still busy processing (not sure how) 
     (send-off agent process-queue-item))) 

    ;(apply await-for (* 10 1000) agents) 
    (Thread/sleep 10000) 
    (logging/info (str "ACTIVE AGENTS " @active-agents)) 
    (if (> 10 loop-count) 
     (do (logging/info (str "done, let's cleanup " count)) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent))) 
     (apply await agents) 
     (shutdown-agents)) 
     (recur (inc count))))) 
+0

이 같은 메시지 큐를 치료할 수있는 몇 가지 방법이 있나요 seq 그리고 나서 pmap을 사용하여 병렬화를 얻으시겠습니까? –

+0

@Alex Stoddard : 제 경우에는 process-queue-item이 실제로 네트워크 IO에서 차단되므로, 머신에 코어가있는만큼 많은 스레드를 사용하기 때문에 pmap이 올바른 선택이라고 생각하지 않습니다. – erikcw

+0

@erikw : 물론 이죠,하지만 이것은 단지 pmap 구현의 세부 사항입니다 (threads = #cores + 2). 매개 변수화 된 수의 스레드를 사용하여 pmap 버전을 작성할 수 없습니다.pmap 소스의 첫 번째 줄을 봅니다. (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) –

답변

6

당신이 요구하는 것은 작업을 약간의 상한을두고 계속 유지하는 것입니다. 이것에 대한 간단한 접근법은 세마포어를 사용하여 한계를 조정하는 것입니다. 여기에 접근 방법은 다음과 같습니다.

(let [limit (.availableProcessors (Runtime/getRuntime)) 
     ; note: you might choose limit 20 based upon your problem description 
     sem (java.util.concurrent.Semaphore. limit)] 
    (defn submit-future-call 
    "Takes a function of no args and yields a future object that will 
    invoke the function in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [#^Callable task] 
    ; take a slot (or block until a slot is free) 
    (.acquire sem) 
    (try 
     ; create a future that will free a slot on completion 
     (future (try (task) (finally (.release sem)))) 
     (catch java.util.concurrent.RejectedExecutionException e 
     ; no task was actually submitted 
     (.release sem) 
     (throw e))))) 

(defmacro submit-future 
    "Takes a body of expressions and yields a future object that will 
    invoke the body in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [& body] `(submit-future-call (fn [] [email protected]))) 

#_(example 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    ;; blocks at this point for a 2 processor PC until the previous 
    ;; two futures complete 
    #<[email protected]: :pending> 
    ;; then submits the job 

이제는 작업 자체를 조정하는 방법 만 조정하면됩니다. 이미 그 메커니즘을 가지고있는 것 같습니다. 루프 (submit-future (process-queue-item))

4

아마도 seque 함수를 사용할 수 있습니까? (doc seque) 인용 :

clojure.core/seque 
([s] [n-or-q s]) 
    Creates a queued seq on another (presumably lazy) seq s. The queued 
    seq will produce a concrete seq in the background, and can get up to 
    n items ahead of the consumer. n-or-q can be an integer n buffer 
    size, or an instance of java.util.concurrent BlockingQueue. Note 
    that reading from a seque can block if the reader gets ahead of the 
    producer. 

가 내가 생각하고있는 것은 네트워크를 통해 큐 항목을 받고 게으른 순서입니다; seque에 이것을 랩하고 Ref에 넣은 다음 작업자 에이전트가이 항목을 사용하도록합니다. seque. seque은 코드의 관점에서 볼 때 보통의 seq처럼 보이는 것을 반환합니다. 큐 마법은 투명하게 발생합니다. 내부에 넣은 시퀀스가 ​​청크 된 경우에는 한 번에 한 번에 여러 번 청크가 실행됩니다. 또한 seque에 대한 초기 호출은 초기 항목 또는 두 가지가 획득 될 때까지 차단되는 것으로 보입니다 (또는 경우에 따라 청크, 게으른 시퀀스가 ​​seque 자체보다 작동하는 방식과 더 관련이 있다고 생각합니다).

(전혀으로 테스트되었습니다 정말 스케치 한) 코드의 스케치 : 당신이 요청할 수 있도록 사실

(defn get-queue-items-seq [] 
    (lazy-seq 
    (cons (get-queue-item) 
     (get-queue-items-seq)))) 

(def task-source (ref (seque (get-queue-items-seq)))) 

(defn do-stuff [] 
    (let [worker (agent nil)] 
    (if-let [result 
      (dosync 
       (when-let [task (first @task-source)] 
       (send worker (fn [_] (do-stuff-with task)))))] 
     (do (await worker) 
      ;; maybe do something with worker's state 
      (do-stuff))))) ;; continue working 

(defn do-lots-of-stuff [] 
    (let [fs (doall (repeatedly 20 #(future (do-stuff))))] 
    fs))) 

당신이 아마 큐 항목 서열의 더 복잡한 프로듀서 싶어 새로운 항목을 만드는 것을 중지해야합니다 (모든 것이 정상적으로 종료 될 수있는 경우 필요하며, 작업 소스가 마를 때 미래가 사라질 것입니다. 이미 완료했는지 확인하려면 future-done?을 사용하십시오). 그리고 그것은 내가 처음 볼 수있는 것입니다 ... 여기에 더 많은 것들이있을 것이라 확신합니다. 하지만 일반적인 접근 방식이 효과가 있다고 생각합니다.

+0

코드 스케치의 마지막 줄에 수정을 추가하여 선물을 실제로 만들 것입니다. (모든 아이디어에 정말로 중요한 것은 ...-)) –

+0

이 코드를 이해하려고합니다. 작업 출처를 참조하는 이유는 무엇입니까? 당신은 언제든지 그것을 변경하지 않는 것 같습니다. –

+0

@Siddhartha Reddy : 언뜻 나는이 코드를 "* 정말 * 스케치"라고 불렀다. ;-)'dosync' 내부의'when-let'에는'(task-source rest를 바꾼다)'(또는'next')가 필요할 것 같습니다. 실제로 이것을 다시 생각해 보면,'seque'를 사용하는 것이 결국 좋은 생각인지 궁금합니다. 'seque'는 항목들이 작업자에 의해 요청되기 전에 꺼내기 때문에 로컬 기계의 충돌시 잃어버린 대기열에서 항목의 수를 증가 시킨다는 것이 나에게 보인다. 그런 다음 몇 가지 시나리오에서 성능면에서 좋을 수도 있습니다. 그건 –

23
(let [switch (atom true) ; a switch to stop workers 
     workers (doall 
       (repeatedly 20 ; 20 workers pulling and processing items from SQS 
        #(future (while @switch 
          (retrieve item from Amazon SQS and process)))))] 
    (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-) 
    (reset! switch false) ; stop ! 
    (doseq [worker workers] @worker)) ; waiting for all workers to be done 
+2

이것은 1.4에서 더 이상 작동하지 않습니다 ('future'와'future-call'은'IFn'을 반환하지 않습니다. 이것은'repeated'가 필요합니다). 하지만 함수에서 미래를 쉽게 감쌀 수 있습니다. (비록'future''를'#'로 붙이겠습니다.) –

+3

@AlexB 잘 잡으십시오. 1.4 문제는 아닙니다 : #이 코드가 있어야합니다. 감사! – cgrand

0

나는 아직도 언어와 초보자 해요,하지만 다음과 같은 솔루션은 나를 위해 작동하기 때문에 이것이 얼마나 관용적 확실하지 :

(let [number-of-messages-per-time 2 
     await-timeout 1000] 
    (doseq [p-messages (partition number-of-messages-per-time messages)] 
    (let [agents (map agent p-messages)] 
     (doseq [a agents] (send-off a process)) 
     (apply await-for await-timeout agents) 
     (map deref agents)))) 
관련 문제