컨텍스트 : Clojure + RabbitMQ (Langohr을 통해), 후속 작업은 this question입니다.Clojure + RabbitMQ/multithreaded 메시지 소비량
RabbitMQ mq (직접 교환기에서 메시지를 가져 와서 메시지 처리 후 팬 아웃 교환기에 게시)에서 메시지를 소비하는 데 이상한 결과가 나타납니다. 왜 메시지가 결국 개의 개별 스레드에 쓰이는 지 이해할 수 없습니다. (몇 개의 메시지마다 스레드 전환이 발생합니다.)
소비자는 별도의 스레드에서 시작합니다 (IO 예외가 발생하는 경우 주 스레드가 손상되는 것을 방지하기 위해)하지만 전환에 대해서는 설명하지 않습니다.
; Message handler
(defn message-handler
[pub-name ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
content (string/join " " (map msg '("title" "link" "body")))
tags (pluck-tags content)]
(println (format "HANDLER %s: Message: %s | found tags: %s"
(Thread/currentThread)
(msg "title")
(tags-to-csv tags)))
(nil)))
; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
메시지 처리기는 현재 스레드와 수신 된 페이로드를 인쇄합니다. 이것은 내가 무엇을 얻을 수 있습니다 :
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
참고
내가 에이전트와 함께 연주하는 동안이 나타났습니다. 각 메시지를 자체 CPU 바운드 스레드 풀에서 처리하고 무제한 (IO) 스레드 풀에 게시하려고했습니다. 그러나 현재 스레드를 인쇄 한 후에 에이전트 (또는 선물)를 사용하지 않아도 메시지가 다른 스레드에 의해 처리된다는 것을 알았습니다.
답변 해 주셔서 감사합니다. 질문을 편집했습니다 (메시지 처리기 추가). 보시다시피 요원은 없습니다. – neektza