2011-07-05 2 views
4

내가 부스트를 사용하고 :: ASIO부스트 : 스레드을 메시지를 수락 메시지 서비스를 실현하기 위해 보내 그들 비동기 어떤 메시지가없는 경우 처리 중이거나 대기열 메시지가 처리되고있는 경우.잘못 사용 :: ASIO 및 부스트 : 스레드

메시지의 속도가 내 머리에 높습니다. 약 초당 2.000 메시지. 너무 많은 메시지로 나는 아주 드문 메시지에 직면한다. 2.000 메시지에서 약 4-8이 손상되었습니다. 문제는 boost :: asio 및/또는 boost :: thread 라이브러리의 잘못된 사용 때문이라고 생각합니다.

구현 한 코드는 주로 this boost tutorial을 기반으로합니다. 나는 실수를 발견 할 수 없으며 주요 메시지가 나오기 때문에 문제의 범위를 좁히기가 어렵다.

혹시 다른 사람이 무슨 일이 벌어지고 있는지 생각해 보셨나요? 기본적으로이 클래스는 다음과 같은 방법으로 사용된다

:

(1) 생성자는 메시지

을 승인하고 전송 스레드, 따라서 서비스를 시작하기 위해 내 프로그램의 시작 부분에서 호출 (2) 메시지를 전송할 때마다 MessageService::transmitMessage()으로 전화를 걸어 async_write이라는 작업을 메시지 큐를 처리하는 스레드에 위임합니다.

using namespace google::protobuf::io; 
using boost::asio::ip::tcp; 

MessageService::MessageService(std::string ip, std::string port) : 
    work(io_service), resolver(io_service), socket(io_service) { 

    messageQueue = new std::deque<AgentMessage>; 
    tcp::resolver::query query(ip, port); 
    endpoint_iterator = resolver.resolve(query); 

    tcp::endpoint endpoint = *endpoint_iterator; 

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect, 
      this, boost::asio::placeholders::error, ++endpoint_iterator)); 

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); 
} 

void MessageService::await() { 

    while (!messageQueue->empty()) { 

     signal(SIGINT, exit); 

     int messagesLeft = messageQueue->size(); 
     sleep(3); 
     std::cout << "Pending Profiler Agents Messages: " 
       << messageQueue->size() << std::endl; 
     if (messagesLeft == messageQueue->size()) { 
      std::cout << "Connection Error" << std::endl; 
      break; 
     } 
    } 

    std::cout << i << std::endl; 
} 

void MessageService::write(AgentMessage agentMessage, long systemTime, 
     int JVM_ID) { 
    agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle()); 
    agentMessage.set_jvm_id(JVM_ID); 
    agentMessage.set_systemtime(systemTime); 
    io_service.post(boost::bind(&MessageService::do_write, this, agentMessage)); 
} 

void MessageService::do_close() { 
    socket.close(); 
} 

void MessageService::transmitMessage(AgentMessage agentMessage) { 

    ++i; 

    boost::asio::streambuf b; 
    std::ostream os(&b); 

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); 
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output); 

    coded_output->WriteVarint32(agentMessage.ByteSize()); 
    agentMessage.SerializeToCodedStream(coded_output); 

    delete coded_output; 
    delete raw_output; 

    boost::system::error_code ignored_error; 

    boost::asio::async_write(socket, b.data(), boost::bind(
      &MessageService::handle_write, this, 
      boost::asio::placeholders::error)); 
} 

void MessageService::do_write(AgentMessage agentMessage) { 

    bool write_in_progress = !messageQueue->empty(); 
    messageQueue->push_back(agentMessage); 

    if (!write_in_progress) { 
     transmitMessage(agentMessage); 
    } 
} 

void MessageService::handle_write(const boost::system::error_code &error) { 

    if (!error) { 
     messageQueue->pop_front(); 
     if (!messageQueue->empty()) { 
      transmitMessage(messageQueue->front()); 
     } 
    } else { 
     std::cout << error << std::endl; 
     do_close(); 
    } 
} 

void MessageService::handle_connect(const boost::system::error_code &error, 
     tcp::resolver::iterator endpoint_iterator) { 
    // can be used to receive commands from the Java profiler interface 
} 

MessageService::~MessageService() { 
    // TODO Auto-generated destructor stub 
} 

헤더 파일 :이 방법은 나에게 의심스러운 것 같다

using boost::asio::ip::tcp; 

class MessageService { 
public: 
    MessageService(std::string ip, std::string port); 
    virtual ~MessageService(); 
    void write(AgentMessage agentMessage, long systemTime, int JVM_ID); 
    void await(); 

private: 
    boost::asio::io_service io_service; 
    boost::asio::io_service::work work; 
    tcp::resolver resolver; 
    tcp::resolver::iterator endpoint_iterator; 
    tcp::socket socket; 
    std::deque<AgentMessage> *messageQueue; 

    void do_write(AgentMessage agentMessage); 

    void do_close(); 

    void handle_write(const boost::system::error_code &error); 

    void handle_connect(const boost::system::error_code &error, 
      tcp::resolver::iterator endpoint_iterator); 

    void transmitMessage(AgentMessage agentMessage); 
}; 
+0

'boost :: thread'에 대해 언급 했으므로 프로그램에서 둘 이상의 스레드를 사용하고 있다고 가정 할 수 있지만 메시지 대기열에 대한 액세스를 보장하기 위해 잠글 필요가 없습니다 (유일한 공유라고 가정합니다). 리소스)는 안전합니다 - 코드를 통과하지 못했음을 인정해야합니다. 큐에 대한 액세스가 단일 스레드 인 경우 ... –

+0

메시지와 큐를 처리하는 스레드가 하나만 있다고 생각합니다. 유일한 함수로, 적어도 내가 생각하기에 이것은 다중 스레드에 의해 호출되는이 방법이다. io_service.post (boost :: bind (& MessageService :: do_write, this, agentMessage)); 내가 생각한 것은 생성자에서 시작된 하나의 스레드에 의해서만 실행됩니다. 하지만 어쩌면 내가 틀렸어. –

+0

'transmitMessage'가 다중 스레드에 의해 호출되지 않으면, io 서비스에서 단일 스레드 만 전달됩니다. 위의 코드에서 아무 것도 잘못 볼 수 없습니다 (다른 스레드에서'write' 만 호출하면됩니다 - 메인 스레드라고 말하면됩니다) - 나는 리시버 코드를 검사 할 것입니다. – Nim

답변

2

void MessageService::transmitMessage(AgentMessage agentMessage) { 
    ++i; 

    boost::asio::streambuf b; 
    std::ostream os(&b); 

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); 
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output); 

    coded_output->WriteVarint32(agentMessage.ByteSize()); 
    agentMessage.SerializeToCodedStream(coded_output); 

    delete coded_output; 
    delete raw_output; 

    boost::system::error_code ignored_error; 

    boost::asio::async_write(socket, b.data(), boost::bind(
      &MessageService::handle_write, this, 
      boost::asio::placeholders::error)); 
} 

당신은 AgentMessage (BTW const를 참조를 통해 전달되어야하는)을로 직렬화하는 것으로 나타 streambuf. 그러나이 직렬화 된 데이터는 명시 적으로 async_write에 documentation

버퍼

데이터를 포함하는 하나 이상의 버퍼를 기록 할을 설명하는의 async_write 완료 핸들러가 호출 될 때까지 존재 보장되지 않습니다. 버퍼 개체는 필요에 따라 으로 복사 될 수 있지만 기본 메모리 블록 인 의 소유권은 호출자가 을 유지하므로 을 보장해야합니다. 처리기를 호출 할 때까지 유효합니다.

이 문제를 해결하려면 버퍼가 완료 핸들러가 호출 될 때까지 범위 내에 있어야합니다. 이 작업을 수행하는 한 가지 방법은 경계 완료 핸들러에 인수로 버퍼를 전달하는 것입니다 :

boost::asio::async_write(socket, b.data(), boost::bind(
      &MessageService::handle_write, this, 
      boost::asio::placeholders::error, 
      coded_output 
      // ^^^ buffer goes here 
      )); 

다음 완료 핸들러 내에서 삭제합니다. 알몸 포인터 대신 shared_ptr을 사용하는 것도 좋습니다.

+0

그래서 async_write가 즉시 반환되기 때문에 async_write가 완료되고 transmitMessage가 남아있을 때 알 수 없기 때문에 할당 된 버퍼가 어떻게되는지 확실하지 않을 수 있습니다. 이 문제를 해결하기 위해 무엇을 제안 하시겠습니까? –

+0

@platzhirsch'async_write '가 완료되는 시간은 잘 알려져 있습니다. 이것은 완료 핸들러가 호출 된 시점입니다. 한 가지 해결책으로 내 대답을 업데이트했습니다. –

관련 문제