2011-08-17 3 views
0

Ruby amqp 라이브러리의 v0.7.1과 Ruby 1.8.7을 사용하여 RabbitMQ 서버에 짧은 (~ 40 바이트) 메시지를 대량으로 게시하려고합니다. 내 프로그램의 메인 루프는 (물론, 정말 루프,하지만 여전히) 다음과 같습니다많은 수의 메시지를 AMQP 대기열에 게시

AMQP.start(:host => '1.2.3.4', 
     :username => 'foo', 
     :password => 'bar') do |connection| 

    channel = AMQP::Channel.new(connection) 
    exchange = channel.topic("foobar", {:durable => true}) 
    i = 0 

    EM.add_periodic_timer(1) do 
    print "\rPublished #{i} commits" 
    end 

    results = get_results # <- Returns an array 

    processor = proc do 
    if x = results.shift then 
     exchange.publish(x, :persistent => true, 
         :routing_key => "test.#{i}") 
     i += 1 
     EM.next_tick processor 
     end 
    end 
    EM.next_tick(processor) 
    AMQP.stop {EM.stop} end 

코드는 결과 배열이 잘 처리하기 시작하지만, 잠시 후 (보통, 12K 메시지 정도 후)을 다음 오류로 사망합니다.

/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send': 
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError) 

메시지가 큐에 저장되어 있지 않습니다. 이 오류는 프로그램에서 대기열 서버로의 네트워크 활동이 시작될 때 발생하는 것 같습니다.

내가 뭘 잘못하고 있니?

+0

RabbitMQ 로그 란 무엇입니까? 브로커가 아직 실행 중입니까? 'lsof -i : 5672'는 무엇을 반환합니까? –

+0

특별한 것은 없지만 스크립트가 시작될 때 연결이 열리고 닫히는 것으로 나타납니다. RabbitMQ는 코드가 실패한 후에도 다른 대기열과 클라이언트에 계속 정상적으로 작동합니다. 이것이 RabbitMQ의 문제라고 생각하지 않습니다. –

답변

-1

첫 번째 실수는 사용중인 RabbitMQ 버전을 게시하지 않았다는 것입니다. 많은 사람들이 OS 패키지 저장소에있는 오래된 버전 1.7.2를 사용하고 있습니다. 당신이 보낸 메시지의 양을 보내는 누군가를위한 나쁜 이동. RabbitMQ 사이트에서 RabbitMQ 2.5.1을 가져와 기본 시스템 패키지를 제거하십시오.

두 번째 실수는 RabbitMQ 로그에 무엇이 있는지 알려주지 않았다는 것입니다.

세 번째 실수는 메시지를 소비하는 것에 대해 아무 말도하지 않았다는 것입니다. 대기열을 선언하고 교환기에 바인딩 한 다른 프로세스가 어딘가에서 실행 중입니까. 어떤 사람이 RabbitMQ에 그것을 선언하고 교환기에 바인딩하지 않는 한 NO 메시지 큐가 있습니다. 큐의 바인딩 키가 게시하는 라우팅 키와 일치하는 경우에도 메시지가 흐르게됩니다.

네 번째 실수. 라우팅 키와 바인딩 키가 섞여 있습니다. 라우팅 키는 topic.test.json.echos와 같은 문자열이고, (대기열을 교환기에 바인드하는 데 사용되는) 바인딩 키는 주제 # 또는 주제와 같은 패턴입니다. .json. 이 고정되었을 때

는 잘 모르겠어요, 당신의 해명 후 에 관한 업데이트 된 버전하지만 문제는 그것의 지속성 로그를 통해 압연 할 때 충돌 RabbitMQ를 일으키는 많은 양의 지속성 메시지와 1.7.2에 있던 , 충돌 후 다른 사람이 수동으로 롤오버를 undid 때까지 다시 시작할 수 없습니다.

연결이 열리고 닫히고 있다고 말하면 메시지 당 아닌 것이 좋기를 바랍니다. 그것은 AMQP를 사용하는 이상한 방법 일 것입니다.

반복하겠습니다. 생산자는 이 아니며 메시지를 대기열에 씁니다. 이들은 교환기에 메시지를 기록한 다음 라우팅 키 (문자열)와 대기열의 바인딩 키 (패턴)를 기반으로 메시지를 대기열로 라우팅합니다. 귀하의 예제에서 나는 # 기호의 사용법을 잘못 읽었지만, 대기열을 선언하고이를 교환기에 묶는 것은 아무것도 보지 못했습니다.

+0

분명히하자. 1. 사실 데비안의 기본 버전 (1.8.1)을 사용하고 있습니다. Python의 AMQP 라이브러리를 사용하면이 버전은 땀을 흘리지 않고도 수천만 건의 메시지를 견딜 수 있습니다. 2. 특별한 것은 없으나, 연결이 열린 다음 닫힌 것입니다. 3. 적절한 대기열 Y 인딩을 선언하는 여러 사용자가 있습니다 (검증 된 작업).그러나 소비자는 제작자가 메시지를 큐에 쓸 수있는 능력에 영향을주지 않아야합니다. 4. 아니요, 이것은 큐 바인딩이 아닙니다. 이것은 루비의 문자열 대체 방법입니다. –

+0

모두 나는 그것이 Ruby AMQP/eventmachine 버그이고 내 코드 또는 RabbitMQ의 버그는 아니라고 생각합니다. 적은 수의 메시지에 대해서 전체 설정이 올바르게 작동합니다. –

관련 문제