2013-07-15 2 views
11

basic.consume을 사용하여 대기열에 가입 한 단순 게시자 및 소비자를 생성했습니다.RabbitMq에서 메시지를 수신하지 않음

내 소비자는 예외없이 작업이 실행될 때 메시지를 확인합니다. 예외가 생길 때마다 메시지를 확인하지 않고 일찍 돌아옵니다. 확인 된 메시지 만 대기열에서 사라 지므로 제대로 작동합니다.
이제 소비자가 실패한 메시지를 다시 가져 오도록하고 싶지만 그 메시지를 다시 받아 볼 수있는 유일한 방법은 소비자를 다시 시작하는 것입니다.

이 유스 케이스에 어떻게 접근해야합니까?

설정 코드

$channel = new AMQPChannel($connection); 

$exchange = new AMQPExchange($channel); 

$exchange->setName('my-exchange'); 
$exchange->setType('fanout'); 
$exchange->declare(); 

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declare(); 
$queue->bind('my-exchange'); 

소비자 코드

$queue->consume(array($this, 'callback')); 

public function callback(AMQPEnvelope $msg) 
{ 
    try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return; 
    } 
    return $queue->ack($msg->getDeliveryTag()); 
} 

생산자 코드

$exchange->publish('message'); 
+0

어떤 언어를 사용하며 어떤 코드를 제공 할 수 있습니까? – pinepain

+0

@ zaq178miami, 내 편집 된 메시지보기 –

+0

@Bram_Gerritsen, 내 답변 업데이트 참조 – pinepain

답변

15

메시지 w하는 경우 확인되지 않고 응용 프로그램이 실패하면 자동으로 다시 배달되며 봉투의 redelivered 속성은 true으로 설정됩니다 (사용자가 no-ack = true 플래그로 소비하지 않는 한).

UPD : 당신은 재전송 횟수가 RabbitMQ과 AMQP 프로토콜에서 구현되지 않는 동안

try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); 
    } 

무한 nacked 메시지을 조심 당신의 catch 블록에 재 전달 플래그 nack 메시지가

조금도.

당신이 그런 메시지로 혼란에 원하는 단순히 nack 메서드 호출하기 전에 몇 가지 sleep() 또는 usleep()을 추가 할 수있는 몇 가지 지연을 추가 할 수 있지만, 전혀 좋은 생각이 아니다하지 않습니다. 신뢰성, 표준, 명확한

  • 단점 :

    1. Dead Letter Exchanges

    • 전문가에 의존 :

      이 사이클에 재전송의 문제를 해결하기 위해 여러 기술 추가적인 로직을 필요로

    2.per message or per queue TTL

    • 프로를 사용하여 쉽게, 표준도 명확
    • 단점을 구현하기 : 긴 대기열에 당신이 어떤 메시지

    예 (메모를 느슨하게 할 수있다, 그 큐의 TTL을 위해 우리는 수를 통과 및 메시지 TTL의 - 무엇이든 할 것이다 숫자 문자열) : 메시지 TTL 당

    2.1 :

    $queue = new AMQPQueue($channel); 
    $queue->setName('my-queue'); 
    $queue->declareQueue(); 
    $queue->bind('my-exchange'); 
    
    $exchange->publish(
        'message at ' . microtime(true), 
        null, 
        AMQP_NOPARAM, 
        array(
         'expiration' => '1000' 
        ) 
    ); 
    

    2.2. 큐 TTL 단위 :

    $queue = new AMQPQueue($channel); 
    $queue->setName('my-queue'); 
    $queue->setArgument('x-message-ttl', 1000); 
    $queue->declareQueue(); 
    $queue->bind('my-exchange'); 
    
    $exchange->publish('message at ' . microtime(true)); 
    

    3. 보류 다시 배달 계산 또는 왼쪽을 다시 배달 번호 메시지 본문 또는 헤더

    • 프로 (일명 제한 또는 TTL IP 스택의 홉) : 당신에게 여분의 제어를 제공 메시지 수명 평일 응용 프로그램 수준
    • 단점 : 메시지를 수정하고 다시 게시해야 할 때 응용 프로그램마다 명확하지 않은 상당한 오버 헤드

    코드 :

    $queue = new AMQPQueue($channel); 
    $queue->setName('my-queue'); 
    $queue->declareQueue(); 
    $queue->bind('my-exchange'); 
    
    $exchange->publish(
        'message at ' . microtime(true), 
        null, 
        AMQP_NOPARAM, 
        array(
         'headers' => array(
          'ttl' => 100 
         ) 
        ) 
    ); 
    
    $queue->consume(
        function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { 
         $headers = $msg->getHeaders(); 
         echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; 
         echo $msg->getDeliveryTag(), ' '; 
         echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; 
         echo $msg->getBody(), PHP_EOL; 
    
         try { 
          //Do some business logic 
          throw new Exception('business logic failed'); 
         } catch (Exception $ex) { 
          //Log exception 
          if (isset($headers['ttl'])) { 
           // with ttl logic 
    
           if ($headers['ttl'] > 0) { 
            $headers['ttl']--; 
    
            $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); 
           } 
    
           return $queue->ack($msg->getDeliveryTag()); 
          } else { 
           // without ttl logic 
           return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue 
          } 
    
         } 
    
         return $queue->ack($msg->getDeliveryTag()); 
        } 
    ); 
    

    흐름을 다시 배달 더 나은 제어 메시지에 다른 방법이 될 수 있습니다.

    결론 : 실버 총알 솔루션이 없습니다. 어떤 해결책이 필요에 가장 적합한 지 결정하거나 다른 것을 찾아야하지만 여기에서 공유하는 것을 잊지 마십시오.)

  • +0

    답변 해 주셔서 감사합니다. 'redelivered'는 실제로'true'로 설정되어 있습니다 만, 메시지를 재검토하기 위해 차단 소비자를 다시 시작해야합니다. –

    +0

    고마워,이게 내가 필요한거야. 무한정 재 전달 된 메시지를 방지하는 방법에 대한 지침이나 제안 사항을 알려주십시오. 주어진 대기 시간만큼 대기열로 대기열을 지연시킬 수 있으면 좋겠지 만 소비하는 서버에 과부하가 걸리지는 않습니다. –

    +0

    여기에 당신이 가서 대답을 다시 – pinepain

    0

    소비자를 다시 시작하지 않으려면 basic.recover AMQP 명령이 당신이 될 수 있습니다 필요. AMQP protocol에 따르면 :

    basic.recover(bit requeue) 
    
    Redeliver unacknowledged messages. 
    
    This method asks the server to redeliver all unacknowledged messages on a specified channel. 
    Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
    
    +0

    이 메서드는 사용중인 클라이언트 API의 일부로 보이지 않습니다. http://www.php.net/manual/en/book.amqp.php –

    +1

    RabbitMQ는이 방법을 부분적으로 지원합니다. [official doc on it] (https://www.rabbitmq.com/specification.html# method-status-basic.recover) – pinepain

    관련 문제