2012-02-15 3 views
2

P2P 애플리케이션에 대한 비동기 네트워크 프로그래밍에서 문제가 발생했습니다. 내 응용 프로그램은 모두 서버와 클라이언트 여야합니다. 서버가 요청을 수신하면 k 다른 서버로 브로드 캐스트해야합니다. boost :: asio 예제의 HTTP Server 3 Example이 비동기 클라이언트 (클래스로 구현 됨)와 함께 잘 작동 할 수 있다고 생각했습니다. 새로운 요청이 서버에 의해 수신클라이언트가있는 C++ boost/asio 서버

ClientIO::ClientIO(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator) 
: _io_service(io_service), 
     strand_(io_service), 
     resolver_(io_service), 
    socket_(io_service) 
    { 
    tcp::endpoint endpoint = *endpoint_iterator; 
     socket_.async_connect(endpoint, 
     boost::bind(&ClientIO::handle_after_connect, this, 
     boost::asio::placeholders::error, ++endpoint_iterator)); 
    } 

    void ClientIO::write(G3P mex) 
    { 
    _io_service.post(boost::bind(&ClientIO::writeMessage, this, mex)); 
    } 

    void ClientIO::writeMessage(G3P mex) 
    { 
    bool write_in_progress = !messages_queue_.empty(); 
    messages_queue_.push_back(mex); 
    if (!write_in_progress) 
    { 
    char* message=NULL; 
    boost::system::error_code ec; 
    if (messages_queue_.front().opcode == DATA) 
    { 
     message=(char*)malloc((10800)*sizeof(char)); 
    } 
    else 
     message=(char*)malloc(1024*sizeof(char)); 

    boost::asio::streambuf request; 
    std::ostream request_stream(&request); 
    serializeMessage(message, messages_queue_.front()); 
    request_stream << message; 
    boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)), 
    strand_.wrap(
    boost::bind(&ClientIO::handle_after_write, this, 
    boost::asio::placeholders::error))); 
     free(message); 
    } 
    } 

    void ClientIO::readMessage() 
    { 
boost::asio::async_read(socket_, data_, 
    boost::bind(&ClientIO::handle_after_read, this, 
boost::asio::placeholders::error, 
boost::asio::placeholders::bytes_transferred 
    )); 
    } 

    void ClientIO::stop() 
    { 
    socket_.shutdown(tcp::socket::shutdown_both); 
    socket_.close(); 
    } 

    void ClientIO::handle_after_connect(const boost::system::error_code& error, 
    tcp::resolver::iterator endpoint_iterator) 
    { 
    if (error) 
    { 
    if (endpoint_iterator != tcp::resolver::iterator()) 
    { 
     socket_.close(); 
     tcp::endpoint endpoint = *endpoint_iterator; 
     socket_.async_connect(endpoint, 
     boost::bind(&ClientIO::handle_after_connect,this, 
     boost::asio::placeholders::error, ++endpoint_iterator)); 
    } 
    } 
    else 
    { 
    } 
    } 

    void ClientIO::handle_after_read(const boost::system::error_code& error, std::size_t bytes_transferred) 
    { 
    if (bytes_transferred > 0) 
    { 
    std::istream response_stream(&data_); 
    std::string mex=""; 
    std::getline(response_stream, mex); 
    deserializeMessage(&reply_,mex); 
    if (reply_.opcode == REPL) 
    { 
     cout << "ack received" << endl; 
    } 
    } 
    if (error) 
    { 
    ERROR_MSG(error.message()); 
    } 
    } 

    void ClientIO::handle_after_write(const boost::system::error_code& error) 
    { 
    if (error) 
    { 
    //   ERROR_MSG("Error in write: " << error.message()); 
    } 
    else 
    { 
    messages_queue_.pop_front(); 
    if (!messages_queue_.empty()) 
    { 
     cout << "[w] handle after write" << endl; 
     char* message; 
     if (messages_queue_.front().opcode == DATA) 
     { 
    message=(char*)malloc((10800)*sizeof(char)); 
     } 
     else 
    message=(char*)malloc(1024*sizeof(char)); 
     boost::asio::streambuf request; 
     std::ostream request_stream(&request); 
     serializeMessage(message, messages_queue_.front()); 
     request_stream << message; 
     boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)), 
     strand_.wrap(
     boost::bind(&ClientIO::handle_after_write, this, 
     boost::asio::placeholders::error))); 
    } 

    boost::asio::async_read_until(socket_, data_,"\r\n", 
      strand_.wrap(
      boost::bind(&ClientIO::handle_after_read, this, 
     boost::asio::placeholders::error, 
     boost::asio::placeholders::bytes_transferred))); 
    } 
    } 

    ClientIO::~ClientIO() 
    { 
    cout << "service stopped" << endl; 
    } 

}

를, 그것은 새로운 자료 관리 클래스 양식 연결을 시작합니다 (부스트 :: ASIO 클라이언트의 예에서) 위에서 언급 한 클라이언트 클래스는이 다음이다 일부 계산 한 후, 상기의 클래스를 사용하여 (여기서 하나)에 다른 서버 큐 물품 각각 쓰기

client --write-> server ---write->\ 
            |--server1 
       server <--ACK----</ 

그 달성을위한 ACK를 대응하는에서, I는 io_service하여 만든 자세 클래스 변수로서 (io_service_test) 상기 자료 관리 생성자 다음으로 인스턴스화 :

DataManagement::DataManagement(){ 
    tcp::resolver resolver(io_service_test); 
    tcp::resolver::query query(remotehost, remoteport); 
    tcp::resolver::iterator iterator = resolver.resolve(query); 
    cluster = new cluster_head::ClusterIO(io_service_test,iterator); 
    io_service_test.run_one(); 
} 

을 그리고, 계산 한 후, 데이터를 보내

void DataManagement::sendTuple(. . .){ 

    . . . 
    io_service_test.reset(); 
    io_service_test.run(); 
    for (size_t i=0; i<ready_queue.size() ;i++) 
    { 
     cluster->write(fragTuple); 
    } 
} 

대편은 동일한 HTTP의 proxy3 인을 클라이언트 클래스가없는 동일한 방식으로 수정 된 예제. 문제는 때로는 모든 것이 잘 작동하고 때로는 실패하고 스택 트레이스가 생기거나 때로는 멈추지도 않으며 세그먼테이션 오류도 발생한다는 것입니다. 나는이 문제가 io_service 관리와 클래스 메서드의 수명에 영향을받지 않는다고 생각하지만, 나는 알 수 없다.

  • 아이디어가 있으십니까?
  • 이 경우에 맞는 몇 가지 예가 있거나 그것을 구현하는 더미 클래스가 있습니까?
+0

백 트레이스를 제공 할 수 있습니까? – tr9sh

답변

1

간단히 코드를 검토 한 결과, 다음과 같은 문제점이 있습니다.

  1. ClientIO::writeMessage 방법은
    • 가 데이터를 전송하지 않는, 메시지를
    • 전화 boost::asio::async_write을 메모리를 할당하기 때문에,받는 사람에게 잘못된 정보를 전송하지만 내부 ASIO의 요청의 큐에 요청을 둔다 , 즉 메시지는 언젠가 보내질 것입니다. boost::asio::buffer메시지을 복사하지 않습니다. 그것에 대한 참조 만 저장합니다.
    • free(message입니다. 즉, 메시지 ()에 할당 된 메모리는 대기열에 저장된 쓰기 요청이 실행될 때 덮어 쓸 수 있습니다.
  2. ClientIO::handle_after_write의 메모리 누수. 메시지은 할당되었지만 해제되지 않았습니다.
  3. ClientIO::readMessageboost::asio::async_read 메서드는 strand_.wrap 호출로 래핑되지 않습니다.

shared_const_buffer 클래스의 ASIO 버퍼 예제를 사용하려면 # 1과 # 2 문제가 필요합니다. boost::asio::async_write 전화와 동일한 방법으로 strand_.wrap 통화를 사용하려면 문제 # 3을 해결해야합니다.

+0

감사합니다. @ megabyte1024 – 0x41ndrea

관련 문제