2012-10-13 5 views
0

컨텍스트 : Clojure + RabbitMQ (Langohr을 통해), 후속 작업은 this question입니다.Clojure + RabbitMQ/multithreaded 메시지 소비량

RabbitMQ mq (직접 교환기에서 메시지를 가져 와서 메시지 처리 후 팬 아웃 교환기에 게시)에서 메시지를 소비하는 데 이상한 결과가 나타납니다. 왜 메시지가 결국 개의 개별 스레드에 쓰이는 지 이해할 수 없습니다. (몇 개의 메시지마다 스레드 전환이 발생합니다.)

소비자는 별도의 스레드에서 시작합니다 (IO 예외가 발생하는 경우 주 스레드가 손상되는 것을 방지하기 위해)하지만 전환에 대해서는 설명하지 않습니다.

; Message handler 
(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     content (string/join " " (map msg '("title" "link" "body"))) 
     tags (pluck-tags content)] 
    (println (format "HANDLER %s: Message: %s | found tags: %s" 
        (Thread/currentThread) 
        (msg "title") 
        (tags-to-csv tags))) 
    (nil))) 
    ; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags)))))) 


(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 

메시지 처리기는 현재 스레드와 수신 된 페이로드를 인쇄합니다. 이것은 내가 무엇을 얻을 수 있습니다 :

HANDLER in Thread[pool-1-thread-2,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ... 
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ... 

참고

내가 에이전트와 함께 연주하는 동안이 나타났습니다. 각 메시지를 자체 CPU 바운드 스레드 풀에서 처리하고 무제한 (IO) 스레드 풀에 게시하려고했습니다. 그러나 현재 스레드를 인쇄 한 후에 에이전트 (또는 선물)를 사용하지 않아도 메시지가 다른 스레드에 의해 처리된다는 것을 알았습니다.

답변

0

여기 Langohr의 저자.

코드에 누락 된 부분이 있어야합니다. 에이전트에서이 출력을 얻으면 그 값은 입니다. Clojure 에이전트 (또한 선물 및 약속)는 스레드 풀을 사용합니다. Langohr의 langohr.consumers/subscribe 또는 RabbitMQ Java 클라이언트의 기본 QueueingConsumer는 그렇지 않습니다.

+0

답변 해 주셔서 감사합니다. 질문을 편집했습니다 (메시지 처리기 추가). 보시다시피 요원은 없습니다. – neektza

1

1) 메시지 교환 중에 라우팅 키가 전혀 사용되지 않았 음을 의미합니다. 팬 아웃 교환기는 메시지를 모든 큐에 바인딩합니다. 라우팅 키를 사용하려면 직접 또는 주제 교환을 사용하십시오.

2) 항상 동일한 큐 이름을 사용합니다. 즉, 코드에서 수행중인 작업은 동일한 큐에 여러 소비자를 추가하는 것입니다. 즉, rabbitmq는 사용자 주위에 로빈 메시지를 순환시킬 것입니다.

+0

팬 아웃 교환 (e-pub-name)은 처리 된 메시지를 게시하는 데 사용됩니다. 직접 교환 (e-sub-name)은 사전 처리 대기열의 메시지를 청취하는 데 사용됩니다. 그래서 클로제 프로세스는 중간 사람입니다. 큰 그림 : ** 생산자 ** => 직접 교환 -> preproc 큐 => 클로저 프로세스 (또는로드에 따라 다중 프로세서) => 팬 아웃 교환 -> ** 소비자 ** – neektza

+0

Clojure 프로세스가 확장 가능하다는 의미이므로 직접 교환을 사용하여 수신 대기하는 이유 - 더 많은 프로세스를 시작하고 메시지를 라운드 로빈 방식으로 라우팅해야합니다. 메시지를 처리 ​​한 후 다른 소비자에게 게시합니다. – neektza