2015-01-03 1 views
0

나는 boost :: asio가 새로 도입되어 현재 TCP 서버를 수신하는 간단한 서버 애플리케이션을 구축하려고한다. 클라이언트 코드와 서버 코드는 모두 인터넷에서 발견 된 예제에서 파생됩니다 (정상적으로 작동 함).
내 서버가 데이터없이 async_read 콜백을 계속 호출합니다. 나는 이미 일 찾고 있어요,하지만 예를 들어 내 코드 :(사이의 차이를하지 않았다
그래서 희망이 사람이 내가 잘못는지 말해 여기 부스트 :: asio :: read_async 연속 콜 : 데이터없이 콜백

내 서버 응용 프로그램 코드입니다.

예 - 클라이언트 : 나는 내가 (약간 변경) I-인터넷에서 볼 수있는 예 코드를 추가합니다, 완전성에 대한

[ 0.000]  test[0x0002]: INF: SERVER_TEST::main():56: argv[1]=s 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::main():78: startServer=1 startClient=0 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::main():79: adr=127.0.0.1 port=12342 _port=12342 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::main():86: start server thread 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::operator()():92: create TCP endpoint 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::operator()():95: create TCP socket 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::operator()():97: create TCP acceptor 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::operator()():100: call async_accept 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::operator()():117: notify server is init 
[ 0.000]  test[0x0002]: INF: SERVER_TEST::operator()():119: io_service_server.run() 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():104: async_accept callback 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():107: call doRead 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::doRead():23: call async_read 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():29: async_read callback 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():32: accept 0 bytes of data from client 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():38: call doRead 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::doRead():23: call async_read 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():29: async_read callback 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():32: accept 0 bytes of data from client 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():38: call doRead 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::doRead():23: call async_read 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():29: async_read callback 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():32: accept 0 bytes of data from client 
[ 26.599]  test[0x0002]: INF: SERVER_TEST::operator()():38: call doRead 
[...] a.s.o. 

:

#define DMOD DebugIds::TEST 
#define DSUBID 0x0002 
#define CLASSNAME "CLIENT_TEST" 
#include <debug/dbg.h> 

#include <memory> 

#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

using boost::asio::ip::tcp; 

void doRead (tcp::socket& socket) 
{ 
    std::vector<char> buffer (0,1024); 
    INF ("call async_read"); 
    boost::asio::async_read (
     socket, 
     boost::asio::buffer(buffer.data(), buffer.size()), 
     [&socket](boost::system::error_code ec, std::size_t length) 
     { 
      INF ("async_read callback"); 
      if (!ec) 
      { 
       INF ("accept %d bytes of data from client", length); 
      } 
      else 
      { 
       ERR ("cannot read from client"); 
      } 
      INF ("call doRead"); 
      doRead (socket); 
     } 
    ); 
} 


int main(int argc, char* argv[]) 
{ 
    bool startClient = true; 
    bool startServer = true; 

    Debug::Instance()->setModuleLevel (DebugIds::TEST, INFO); 
    Debug::Instance()->setModuleLevel (DebugIds::SERVER, DETAIL); 
    Debug::Instance()->setModuleLevel (DebugIds::CLIENT, DETAIL); 

    if (argc > 1) 
    { 
     INF ("argv[1]=%s", argv[1]); 
     if (strcmp ("s", argv[1]) == 0) 
     { 
      startClient = false; 
     } 
     else if (strcmp ("c", argv[1]) == 0) 
     { 
      startServer = false; 
     } 
    } 


    boost::thread serverThread; 
    boost::thread clientThread; 

    boost::mutex mutex; 
    boost::condition_variable clientcond; 

    unsigned int port = 12342; 
    std::string _port = "12342"; 
    std::string adr = "127.0.0.1"; 

    INF ("startServer=%d startClient=%d", startServer, startClient); 
    INF ("adr=%s port=%d _port=%s", adr.c_str(), port, _port.c_str()); 

    boost::asio::io_service io_service_server; 
    if (startServer) 
    { 
     // start threads 
     mutex.lock(); 
     INF ("start server thread"); 
     serverThread = boost::thread (
      [&]() //lambda method 
      { 
       try 
       { 
        INF ("create TCP endpoint"); 
        boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string(adr), port); 
        //boost::asio::ip::tcp::endpoint ep(tcp::v4(), port); 
        INF ("create TCP socket"); 
        tcp::socket m_socket(io_service_server); 
        INF ("create TCP acceptor"); 
        tcp::acceptor m_acceptor(io_service_server,ep); 

        INF ("call async_accept"); 
        m_acceptor.async_accept (m_socket, 
         [&](boost::system::error_code ec) 
         { 
          INF ("async_accept callback"); 
          if (!ec) 
          { 
           INF ("call doRead"); 
           doRead (m_socket); 
          } 
          else 
          { 
           ERR ("cannot accept client"); 
          } 
         } 
        ); 

        INF ("notify server is init"); 
        clientcond.notify_one(); 
        INF ("io_service_server.run()"); 
        io_service_server.run(); 
       } 
       catch (std::exception& e) 
       { 
        ERR ("Exception: %s", e.what()); 
        return (0); 
       } 

       INF ("End server application"); 
       return (0); 
      }); 
    } 

    boost::asio::io_service io_service_client; 
    if (startClient) 
    { 
     clientThread = boost::thread (
      //[&mutex,&adr,&_port,&spC,&io_service_client]() 
      [&]() 
      { 
       INF ("wait for server to be initialized"); 
       boost::unique_lock<boost::mutex> lock(mutex); 
       clientcond.wait(lock); 
       INF ("server initialization finished"); 

       try 
       { 
        tcp::resolver resolver(io_service_client); 
        auto endpoint_iterator = resolver.resolve({ adr, _port }); 

        INF ("create and init testclient"); 

        #if 0 
        uint8_t count = 0; 
        std::vector<char> v(0,256); 
        for (auto c: v) 
        { 
         c = ++count; 
        } 
        spC->write(v.data(), v.size()); 
        #endif 

        io_service_client.run(); 
       } 
       catch (std::exception& e) 
       { 
        ERR ("Exception: %s", e.what()); 
       } 

       INF ("End client application"); 
       return (0); 
      }); 
    } 

    char line[256]; 
    while (std::cin.getline(line, 256)) 
    { 
     INF ("got %s", line); 
     if (0 == strcmp ("exit", line)) 
     { 
      #if 0 
      if (spC) 
      { 
       spC->close(); 
       io_service_client.stop(); 
       if (spC) clientThread.join(); 
       spC = std::shared_ptr<Client>(); 
      } 
      if (spS) 
      { 
       spS->stop(); 
       io_service_server.stop(); 
       if (spC) serverThread.join(); 
       spS = std::shared_ptr<Server>(); 
      } 
      #endif 

      break; 
     } 
    } 

    #if 0 
    if (spC) clientThread.join(); 
    if (spS) serverThread.join(); 
    #endif 

    return (0); 
} 

이 콘솔에 다음과 같은 출력을 생성 여기에 일 e 서버에서 데이터가 도착할 때까지 async_read 호출이 차단됩니다.

// 
// chat_client.cpp 
// ~~~~~~~~~~~~~~~ 
// 
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) 
// 
// Distributed under the Boost Software License, Version 1.0. (See accompanying 
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 
// 

#include <cstdlib> 
#include <deque> 
#include <iostream> 
#include <thread> 
#include <boost/asio.hpp> 
#include "chat_message.hpp" 

using boost::asio::ip::tcp; 

#define DMOD DebugIds::TEST 
#define DSUBID 0x0002 
#define CLASSNAME "CLIENT_TEST" 
#include <debug/dbg.h> 

typedef std::deque<chat_message> chat_message_queue; 

class chat_client 
{ 
public: 
    chat_client (boost::asio::io_service& io_service, 
        tcp::resolver::iterator endpoint_iterator) 
        : m_io_service(io_service), 
         m_socket(io_service) 
    { 
     INF (""); 
     do_connect (endpoint_iterator); 
    } 

    void write (const chat_message& msg) 
    { 
     INF ("post io service"); 
     m_io_service.post(
       [this, msg]() 
       { 
        bool write_in_progress = !m_write_msgs.empty(); 
        m_write_msgs.push_back(msg); 
        if (!write_in_progress) 
        { 
         INF ("call do_write"); 
         do_write(); 
        } 
       }); 
    } 

    void close() 
    { 
     INF ("close socket async"); 
     m_io_service.post([this]() { INF ("close socket"); m_socket.close(); }); 
    } 

private: 
    void do_connect(tcp::resolver::iterator endpoint_iterator) 
    { 
     INF ("async_connect"); 
     boost::asio::async_connect (m_socket, endpoint_iterator, 
       [this](boost::system::error_code ec, tcp::resolver::iterator) 
       { 
        INF ("on async_connect"); 
        if (!ec) 
        { 
         INF ("do_read_header"); 
         do_read_header(); 
        } 
       }); 
    } 

    void do_read_header() 
    { 
     INF ("call async_read"); 
     boost::asio::async_read (m_socket, 
            boost::asio::buffer(m_read_msg.data(), chat_message::header_length), 
            [this](boost::system::error_code ec, std::size_t /*length*/) 
            { 
             INF ("on async_read"); 
            if (!ec && m_read_msg.decode_header()) 
            { 
             INF ("call do_read_body"); 
             do_read_body(); 
            } 
            else 
            { 
             ERR ("connect failed"); 
             m_socket.close(); 
            } 
            }); 
    } 

    void do_read_body() 
    { 
     INF ("call async_read"); 
     boost::asio::async_read(m_socket, 
       boost::asio::buffer(m_read_msg.body(), m_read_msg.body_length()), 
       [this](boost::system::error_code ec, std::size_t /*length*/) 
       { 
        INF ("body: on async_read"); 
        if (!ec) 
        { 
         std::cout.write(m_read_msg.body(), m_read_msg.body_length()); 
         std::cout << "\n"; 
         INF ("body: call do_read_header"); 
         do_read_header(); 
        } 
        else 
        { 
         INF ("body: read failed"); 
         m_socket.close(); 
        } 
       }); 
    } 

    void do_write() 
    { 
     INF ("call async_write"); 
     boost::asio::async_write(m_socket, 
      boost::asio::buffer(m_write_msgs.front().data(), 
        m_write_msgs.front().length()), 
        [this] (boost::system::error_code ec, std::size_t length) 
        { 
         INF ("on async_write"); 
         if (!ec) 
         { 
          m_write_msgs.pop_front(); 
          if (!m_write_msgs.empty()) 
          { 
           INF ("call do_write"); 
           do_write(); 
          } 
          else 
          { 
          } 
         } 
         else 
         { 
          INF ("async_write failed"); 
          m_socket.close(); 
         } 
        }); 
    } 

private: 
    boost::asio::io_service& m_io_service; 
    tcp::socket m_socket; 
    chat_message m_read_msg; 
    chat_message_queue m_write_msgs; 
}; 

int main(int argc, char* argv[]) 
{ 
    Debug::Instance()->setModuleLevel (DebugIds::TEST, INFO); 
    Debug::Instance()->setModuleLevel (DebugIds::SERVER, DETAIL); 
    Debug::Instance()->setModuleLevel (DebugIds::CLIENT, DETAIL); 

    try 
    { 
     if (argc != 3) 
     { 
      INF ("Usage: chat_client <host> <port>"); 
      return(1); 
     } 

     INF ("Create IO service"); 
     boost::asio::io_service io_service; 

     INF ("Create TCP resolver"); 
     tcp::resolver resolver(io_service); 
     INF ("Create endpoint iterator"); 
     auto endpoint_iterator = resolver.resolve({ argv[1], argv[2] }); 
     INF ("Create client"); 
     chat_client c(io_service, endpoint_iterator); 

     INF ("Start thread that waits on io_service"); 
     std::thread t([&io_service](){ io_service.run(); }); 
     // run io_service in an own thread, so we can still handle the 
     // incomming data from commandline 

     char line[chat_message::max_body_length + 1]; 
     while (std::cin.getline(line, chat_message::max_body_length + 1)) 
     { 
      chat_message msg; 
      msg.body_length(std::strlen(line)); 
      std::memcpy(msg.body(), line, msg.body_length()); 
      msg.encode_header(); 
      INF ("write message %s", msg.data()); 
      c.write(msg); 
     } 

     c.close(); 

     INF ("Waits on io_service"); 
     t.join(); 
    } 
    catch (std::exception& e) 
    { 
     std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return (0); 
} 

그리고 여기에 서버 : 데이터 오류는이 게시물이 작성된 5 분 후에 발견 된 클라이언트

// 
// chat_server.cpp 
// ~~~~~~~~~~~~~~~ 
// 
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) 
// 
// Distributed under the Boost Software License, Version 1.0. (See accompanying 
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 
// 

#include <cstdlib> 
#include <deque> 
#include <iostream> 
#include <list> 
#include <memory> 
#include <set> 
#include <utility> 
#include <boost/asio.hpp> 
#include "chat_message.hpp" 

#define DMOD DebugIds::TEST 
#define DSUBID 0x0002 
#define CLASSNAME "SERVER_TEST" 
#include <debug/dbg.h> 

using boost::asio::ip::tcp; 

#define DBG(X) std::cout << __FUNCTION__ << X << __LINE__ << std::endl; 

//---------------------------------------------------------------------- 

typedef std::deque<chat_message> chat_message_queue; 

//---------------------------------------------------------------------- 

class chat_participant 
{ 
public: 
    chat_participant() : m_id(++m_counter) {} 
    virtual ~chat_participant() {} 
    virtual void deliver(const chat_message& msg) = 0; 
    static unsigned int m_counter; 
    unsigned int m_id; 
    unsigned int getId(){return (m_id);} 
}; 

unsigned int chat_participant::m_counter = 0; 

typedef std::shared_ptr<chat_participant> chat_participant_ptr; 

//---------------------------------------------------------------------- 

class chat_room 
{ 
public: 
    void join(chat_participant_ptr participant) 
    { 
     m_participants.insert (participant); 
     for ( auto msg: m_recent_msgs) 
     { 
      INF ("send available messages to session id=%d", participant->getId()); 
      participant->deliver(msg); 
     } 
    } 

    void leave(chat_participant_ptr participant) 
    { 
     INF ("session id=%d", participant->getId()); 
     m_participants.erase(participant); 
    } 

    void deliver(const chat_message& msg, unsigned int session_id) 
    { 
     INF ("deliver message to all participiants"); 

     m_recent_msgs.push_back(msg); 
     while (m_recent_msgs.size() > max_recent_msgs) 
      m_recent_msgs.pop_front(); 

     for (auto participant: m_participants) 
     { 
      if( session_id != participant->getId()) 
      { 
       INF (" -> deliver message to p=%d", participant->getId()); 
       participant->deliver(msg); 
      } 
     } 
    } 

private: 
    std::set<chat_participant_ptr> m_participants; 
    enum { max_recent_msgs = 100 }; 
    chat_message_queue m_recent_msgs; 
}; 

//---------------------------------------------------------------------- 

class chat_session 
    : public chat_participant, 
    public std::enable_shared_from_this<chat_session> 
{ 
public: 
    chat_session(tcp::socket socket, chat_room& room) 
     : m_socket (std::move(socket)), 
      m_room (room) 
    { 
     INF ("id=%d", getId()); 
    } 

    void start() 
    { 
     INF ("id=%d", getId()); 
     m_room.join (shared_from_this()); 
     do_read_header(); 
    } 

    void deliver(const chat_message& msg) 
    { 
     INF ("id=%d", getId()); 
     bool write_in_progress = !m_write_msgs.empty(); 
     m_write_msgs.push_back(msg); 
     if (!write_in_progress) 
     { 
      do_write(); 
     } 
    } 

private: 
    void do_read_header() 
    { 
     auto self(shared_from_this()); 

     INF ("id=%d call async_read", getId()); 
     boost::asio::async_read(m_socket, 
     boost::asio::buffer(m_read_msg.data(), chat_message::header_length), 
      [this, self](boost::system::error_code ec, std::size_t /*length*/) 
      { 
       INF ("id=%d read callback!", getId()); 
       if (!ec) 
       { 
        INF ("wait for next message to receive"); 
        do_read_header(); 
       } 
       else 
       { 
        m_room.leave(shared_from_this()); 
       } 
      }); 
    } 

    void do_write() 
    { 
     /** 
     * auto self(shared_from_this()); in combination with the function pointer 
     * [this, self](boost::system::error_code ec, std::size_t length) 
     * ensures, that the chat_session is alive, as long as the asynchronous 
     * operation is ongoing 
     */ 
     auto self(shared_from_this()); 
     INF ("id=%d start async_write", getId()); 
     boost::asio::async_write(m_socket, 
      boost::asio::buffer(m_write_msgs.front().data(), 
       m_write_msgs.front().length()), 
       [this, self](boost::system::error_code ec, std::size_t /*length*/) 
       { 
        INF ("id=%d write callback", getId()); 
        if (!ec) 
        { 
         m_write_msgs.pop_front(); 
         if (!m_write_msgs.empty()) 
         { 
          INF ("messages avaliable -> call do_write"); 
          do_write(); 
         } 
        } 
        else 
        { 
         INF ("error on async write"); 
         m_room.leave(shared_from_this()); 
        } 
       }); 
    } 

    tcp::socket m_socket; 
    chat_room& m_room; 
    chat_message m_read_msg; 
    chat_message_queue m_write_msgs; 
}; 

//---------------------------------------------------------------------- 

class chat_server 
{ 
public: 
    chat_server (boost::asio::io_service& io_service, 
        const tcp::endpoint& endpoint) 
    : m_acceptor (io_service, endpoint), 
     m_socket (io_service), 
     m_room() // call for m_room is not explicitly necessary, but 
        // makes the code more readable 
    { 
     INF ("port=%d", endpoint.port()); 
     do_accept(); 
    } 

private: 
    void do_accept() 
    { 
     INF (" called"); 
     m_acceptor.async_accept (m_socket, 
     [this](boost::system::error_code ec) 
     { 
      if (!ec) 
      { 
       // std::make_shared<T>(args) 
       // creates a shared pointer of T and provides the arguments to 
       // the new object! 
       INF (" clients connection: start a new chat session"); 
       std::make_shared<chat_session> (std::move (m_socket), m_room)->start(); 

       // http://stackoverflow.com/questions/3413470/what-is-stdmove-and-when-should-it-be-used 
       // move allows to swap the ressources instead of copying them around! 
      } 

      INF ("wait for the next client to connect"); 
      do_accept(); 
     }); 
    } 

    tcp::acceptor m_acceptor; 
    tcp::socket m_socket; 
    chat_room m_room; 
}; 

//---------------------------------------------------------------------- 

int main(int argc, char* argv[]) 
{ 
    Debug::Instance()->setModuleLevel (DebugIds::TEST, INFO); 
    Debug::Instance()->setModuleLevel (DebugIds::SERVER, DETAIL); 
    Debug::Instance()->setModuleLevel (DebugIds::CLIENT, DETAIL); 

    try 
    { 
     if (argc < 2) 
     { 
      std::cerr << "Usage: chat_server <port> [<port> ...]\n"; 
      return (1); 
     } 
     INF ("argc=", argc); 

     INF ("define io_service"); 
     boost::asio::io_service io_service; 

     std::list<chat_server> servers; 
     for (int i = 1; i < argc; ++i) 
     { 
      INF ("define endpoint"); 
      tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i])); 
      // create new chat_server by calling its constructor and add 
      // it to the list. emplace_back takes care off the correct 
      // constructor selection 
      INF ("Start chat server at %s:%d", endpoint.address().to_string().c_str(), endpoint.port()); 
      servers.emplace_back(io_service, endpoint); 
     } 

     std::cout << "io_service.stopped()=" << io_service.stopped() << std::endl; 

     INF ("io_service.run()"); 
     boost::system::error_code ec; 
     int size = io_service.run(ec); 
     INF ("io_service.run() [%d] returned with %s", size, ec.message().c_str()); 
    } 
    catch (std::exception& e) 
    { 
     ERR("Exception: %s", e.what()); 
    } 

    INF ("End server application"); 

    return (0); 
} 

답변

2

에서 수신 될 때까지 async_read 호출이 차단된다.

내가해야 REQD 부스트 :: 자세한 ASIO 사양 : 다음 조건 중 하나에 해당하는 때까지

비동기 작업이 계속됩니다
- 제공된 버퍼가 가득합니다. 즉, 전송 된 바이트는 버퍼 크기의 합과 같습니다.
- 오류가 발생했습니다.

그래서 데이터 버퍼 초기화 오류를 수정 한 후 모든 것이 정상적으로 작동했습니다. std :: vector buffer (0,1024); -> std :: vector buffer (1024,0);

0

데이터 수신 전에 std :: vector 버퍼 (doRead에서)가 파괴되었습니다. 부스트 ASIO 버퍼 문서에서

:

비동기 읽거나 당신이 작업에 대한 버퍼가 완료 핸들러 가 호출 될 때까지 유효한지 확인해야합니다 쓰기 호출합니다. 위의 예제에서 버퍼는 std :: string 변수 msg입니다. 이 변수는 스택에 있으므로 비동기 작업이 완료되기 전에 범위를 벗어납니다. 운이 좋으면 응용 프로그램이 충돌하지만 무작위 실패 가능성이 더 높습니다.

클래스에서 버퍼를 전송하거나 shared_ptr 메커니즘을 사용하여 버퍼가 사용 후에 만 ​​파괴되도록 할 수 있습니다. 예 :

void do_read() 
    { 
     std::shared_ptr<std::vector<char>> buf = std::make_shared<std::vector<char>>(1024, 0); 
     m_socket.async_read_some 
     (
      boost::asio::buffer(*buf, 1024), 
      // copy buf. The buffer will be destroyed 
      // only after leaving the lambda function 
      [buf](boost::system::error_code err, std::size_t length) 
      { 
       if(!err) 
       { 
        // use buf 
       } 
       else 
       { 
        // abort 
       } 

       do_read(); 
      } 
     ); 
    }