2012-01-13 2 views
2

프로듀서 끝에서 흐름 제어 상황을 처리하려고합니다. 최대 대기열 크기가 설정된 qpid-broker에 대기열이 있습니다. 또한 queue에 flow_stop_count 및 flow_resume_count를 설정하십시오.아파치 qpid를 사용하는 동안 jms 메시징에서 프로듀서 흐름 제어를 처리하는 방법

이제 제작자는이 flow_stop_count에 도달 할 때까지 계속 메시지를 생성합니다. 이 수를 초과하면 Exception listener가 처리하는 예외가 발생합니다. 이제 언젠가 큐의 소비자가 따라 잡을 것이고 flow_resume_count에 도달하게 될 것입니다. 문제는 생산자가이 사건에 대해 어떻게 알고 있는지입니다. 여기

는 생산자

connection connection = connectionFactory.createConnection(); 
    connection.setExceptionListenr(new MyExceptionListerner()); 
    connection.start(); 
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); 
    Queue queue = (Queue)context.lookup("Test"); 
    MessageProducer producer = session.createProducer(queue); 
    while(notStopped){ 
     while(suspend){//---------------------------how to resume this flag??? 
      Thread.sleep(1000); 
     } 
     TextMessage message = session.createTextMessage(); 
     message.setText("TestMessage"); 
     producer.send(message); 
    } 
    session.close(); 
    connection.close(); 

의 샘플 코드이고 예외 청취자

private class MyExceptionListener implements ExceptionListener { 
    public void onException(JMSException e) { 
     System.out.println("got exception:" + e.getMessage()); 
     suspend=true; 
    } 
} 

을 위해 지금 exceptionListener로 예외에 대한 일반적인 수신기, 그래서 일시 중지하는 것이 좋습니다 안 그것을 통한 생산자 흐름.

내가 필요한 것은 아마도 메시지를 보내기 전에 확인하는 데 사용할 수있는 produer.isFlowStopped()과 같은 생산자 수준의 메소드 일 것입니다. 이러한 기능이 qpid api에 존재합니까?

qpid website에는 이것이 가능하다고 제안하는 문서가 있습니다. 그러나 나는 어디에서든지 이것이 행해지는 어떤 예도 발견 할 수 없었다.

이러한 종류의 시나리오를 처리하는 표준 방법이 있습니까?

답변

1

Apache QPid 문서에서 알 수 있듯이 flow_resume_count 및 flow_stop_count가 생성자가 차단되기 시작하는 것으로 보입니다.

따라서 유일한 옵션은 소프트웨어가 메시지가 다시 흐르기 시작할 때까지 정기적으로 폴링하는 것입니다.

추출물 here.

생산자가 지나치게 많은 대기열로 전송하면 브로커는 더 이상 메시지를 보내지 않도록 클라이언트에 지시하여 응답합니다. 이것의 영향은 브로커가 흐름 제어 명령을 철회 할 때까지 전송하려는 모든 시도가 차단된다는 것입니다.

클라이언트를 차단하는 동안 흐름 제어를 기다리는 동안 차단 된 사실을 주기적으로 기록합니다.흐름 제어 시행 브로커로 인해 5S 지연 보낼 메시지 AMQSession 경고 - - 브로커는 흐름 제어 가 AMQSession 경고 적용되어 강제 -

는 AMQSession 경고 메시지가 일정 기간 후에는 흐름 제어 시행 브로커에 의한 10S 지연 보내기 send는 타임 아웃하고 JMSException을 호출 코드에 던집니다.

ERROR AMQSession - 브로커 강제 흐름 제어에서 대기중인 시간 초과로 인해 메시지 보내기가 실패했습니다.

이 문서에서 이것은 제작자를 관리하는 소프트웨어가 스스로 관리해야 함을 의미합니다. 따라서 기본적으로 대기열이 너무 많아서 예외를 받으면 메시지를 보내려는 폴링 및 재 시도를하는 것이 좋습니다.

+0

예. 나는 이것을 정확히 수행해야만했다. 예외가 생기면 메시지를 보내지 않고 대기열 크기를 확인하기 위해 폴링을 멈춘다. qpid api는 메소드가 현재 큐 크기를 알 수있게 해줍니다. 흐름이 멈출 때를 나타내는 것은 아닙니다. 따라서 큐 크기를 예외로 사용하고 큐 크기가 떨어질 때 생산자를 중지시켜 최대 값 (예외 큐 크기)의 80 %라고 말할 수 있습니다. – Raks

+0

예 선점하는 것이 좋습니다. qpid가 빌드되는 방식은 큐가 꽉 차있을 때 다소 흥미 롭습니다. 그런 다음 예외를 던지기 전에 최대 2 분 동안 전송을 차단합니다. 이는 장기간의 블로킹 작업을 처리 할 수 ​​없다면 제작자 코드가 불안정해질 수 있음을 의미합니다. – aldridmc

1

대기열의 용량 (대기열이 가득 찬 것으로 생각되는 바이트 수) 및 대기열의 flowResumeCapacity (생성자가 비어있는 대기열 크기) 속성을 설정할 수 있습니다.

크기가 용량 값을 초과하면 send()가 차단됩니다.

리포에서이 test case file을 볼 수 있습니다.

관련 문제