2017-05-09 1 views
3

혼합 Java 및 Python 구성 요소가있는 RabbitMQ에서 헤더 교환을 사용하려고하는데 확인 된 배달이 필요합니다.Rabbitmq 헤더 교환 및 확인 된 배달

나는 python (pika) 및 java 클라이언트와 다른 동작을하는 것처럼 보입니다. 파이썬에서

는 :

channel.exchangeDeclare("headers_test", "headers", true, false, null) 
channel.confirmSelect 

val props = MessageProperties.PERSISTENT_BASIC.builder 
¦ ¦ ¦ ¦ .headers(messageHeaders).build 
channel.basicPublish("headers_test", 
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey 
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory 
¦ ¦ ¦ ¦ ¦ ¦props, 
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes) 
channel.waitForConfirmsOrDie() 
: 헤더는 어떤 바운드 소비자 일치하지 않고 메시지를 라우팅 할 수없는 경우

channel.exchange_declare(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers', 
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True) 
channel.confirm_delivery() 
result = channel.basic_publish(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ routing_key='', 
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True, 
¦ ¦ ¦ ¦ ¦ ¦ body=message, 
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2, 
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers)) 

, 결과는

그러나 자바/스칼라에서 거짓

여기서 messageHeaders가 일치하는 항목을 찾지 못하면 메시지가 오류없이 그냥 삭제 된 것처럼 보입니다.

뭔가 빠졌거나 두 클라이언트의 동작이 실제로 다릅니다. 어떻게 java에서 헤더 교환을 사용하여 확인 된 배달을받을 수 있습니까?

참고 : 대기열 라우팅 설정에 이미 "복잡한"교환을 했으므로 게임에 데드 레터 라우팅을 추가하지 않고 그냥 보내야합니다.

답변

1

헤더와 일치하는 대기열이 없더라도 메시지가 확인 된 것으로 간주되는 문제. 워드 프로세서 (https://www.rabbitmq.com/confirms.html)에서 : 교환이 어떤 큐 (큐의 빈 목록을 반환)에 메시지하지 않습니다 경로를 확인하면 unroutable 메시지의 경우

브로커는 확인을 발행합니다. 메시지가 필수로 게시 된 경우 basic.return이 basic.ack보다 먼저 클라이언트에 전송됩니다. 부정 확인 (basic.nack)의 경우도 마찬가지입니다.

메시지가 라우팅되었는지 여부를 감지하려면 basic.return 메시지를 확인해야합니다.

나는 wireshark로 확인해 봤는데 실제로 메시지가 라우팅되지 않으면 AMQP basic.return 메시지가 있다는 것을 알 수있다.

replyCode = [312], replyText = [NO_ROUTE, 교환 :

나는 메시지가 나는이 얻을 전달되지 않은 경우가 참으로

channel.addReturnListener(new ReturnListener() { 
    @Override 
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("App.handleReturn"); 
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
    } 
}); 

그리고 시작해야 supppose 자바에 새앙 토끼의 동기 동작을 에뮬레이션하려면 = [headers_logs, 는 =은 [], 프로 ....

또한, D 당신이 할 수있는 것 같다 routingKey o .waitForConfirmsOrDie()에 의존하는 대신 메시지를 게시하고 확인 수신기를 등록하기 전에 현재 게시 시퀀스 번호를 사용합니다.

그래서 전체 코드 샘플은 다음과 같습니다
channel.addReturnListener(new ReturnListener() { 
     @Override 
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     System.out.println("App.handleReturn"); 
     System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
     } 
    }); 

    channel.addConfirmListener(new ConfirmListener() { 
     @Override 
     public void handleAck(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleAck"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 

     @Override 
     public void handleNack(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleNack"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 
}); 

long nextPublishSeqNo = channel.getNextPublishSeqNo(); 
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo); 

channel.basicPublish("headers_logs", 
    "", 
    true, 
    props, 
    "data".getBytes()); 

그리고 당신은 당신이 메시지를 게시하기 전에 가지고 채널의 게시 일련 번호를 확인하는 데 필요한 리턴/확인 콜백의 내부

.

메시지가 어떤 대기열로 라우팅되지 않은 경우 RabbitMq는 확인 (배달 태그)을 포함하는 단일 basic.return 메시지를 다시 전송합니다. 메시지가 라우트 된 경우 RabbitMq은 확인을 포함하는 단일 bacic.ack 메시지를 다시 전송합니다.

이 RabbitMq 자바 클라이언트가 항상 메시지가 전달되었는지 확인하기 위해)을 basicConfirm (전에 그렇게 논리를 basicReturn() 콜백을 호출 여부를이 될 수 있음을 보인다

등록 반환과에 청취자 확인 채널; 채널의 다음 게시 순서 번호를 기억하십시오. 콜백 또는 확인 콜백을 기다립니다. 반송 콜백 인 경우 메시지가 라우팅되지 않았으므로 동일한 배달 태그에 대한 추가 확인을 무시해야합니다. handleRck()를 받기 전에 handleAck() 콜백을 받으면 메시지가 대기열로 라우팅되었음을 의미합니다.

어떤 경우에는 .handleNack()을 호출 할 수 있는지 확실하지 않지만.

관련 문제