2014-10-10 2 views
0

boost :: asio :: io_service.post()에 내 메서드 처리기를 호출하지 않는 문제가 있습니다. 간단한 클라이언트 및 서버 C++ 응용 프로그램을 사용하여 둘 다 TCPClient 클래스에서 동일한 코드를 사용합니다. 클라이언트 측은 정상적으로 작동하지만 accept를 사용하여 채워진 클래스의 인스턴스는 작동하지 않습니다.TCPClient boost :: asio :: io_service 게시하지 않음

내 전체 프로젝트를 here까지 올렸지 만 관련 코드를 아래에 넣었습니다. 이 라인

io_service.post(boost::bind(&TCPClient::DoWrite, this, msg)); 

가 호출되는하여 TcpClient :: Write 메서드하지만 핸들러 (하여 TcpClient :: DoWrite)에서

은 서버 측에서 호출되지 않습니다.

동일한 TCPClient의 내 async_reads가 정상적으로 작동하기 때문에 IO_Service가 실행되고 있음을 알고 있습니다.

이 나의하여 TcpClient 클래스

.HPP 파일입니다

class TCPClient 
    : public boost::enable_shared_from_this<TCPClient> 
{ 
    public: 
     typedef boost::shared_ptr<TCPClient> pointer; 

    private: 
     boost::asio::io_service io_service; 

     bool m_IsConnected; 
     bool m_HeartbeatEnabled; 

     boost::asio::ip::tcp::socket m_Socket; 
     boost::asio::ip::tcp::endpoint m_Endpoint; 

     boost::asio::steady_timer m_HeartBeatTimer; 
     boost::asio::streambuf m_Buffer; 
     std::string m_Delimiter; 
     std::deque<std::string> m_Messages; 
     bool m_HeartBeatEnabled; 
     int m_HeartBeatTime; 

    private: 
     void HandleConnect(const boost::system::error_code& error); 
     void DoHeartBeat(const boost::system::error_code& error); 
     void DoWrite(const std::string &msg); 
     void HandleWrite(const boost::system::error_code& error); 
     void HandleRead(const boost::system::error_code& error); 

    public: 
     TCPClient(boost::asio::io_service &io_service); 
     TCPClient(bool enableHeartbeat); 
     ~TCPClient(); 
     void Close(); 
     void ConnectToServer(boost::asio::ip::tcp::endpoint& endpoint); 
     void ConnectToServer(const std::string &ip, const std::string &protocol); 
     void ConnectToServer(const std::string &ip, unsigned short port); 
     void Write(const std::string &msg); 
     void StartRead(); 
     void SetHeartBeatTime(int time); 
     boost::asio::ip::tcp::socket& Socket(); 
     boost::asio::io_service& Service(); 
     static pointer Create(boost::asio::io_service& io_service); 

    public: 
     // signals 
     boost::signals2::signal<void(const boost::asio::ip::tcp::endpoint&)> sConnected; 
     boost::signals2::signal<void(const boost::asio::ip::tcp::endpoint&)> sDisconnected;  
     boost::signals2::signal<void(const std::string&)>      sMessage; 
}; 

using boost::asio::ip::tcp; 

TCPClient::pointer TCPClient::Create(boost::asio::io_service& io) 
{ 
    return pointer(new TCPClient(io)); 
} 

TCPClient::TCPClient(boost::asio::io_service& io) 
    : m_IsConnected(true), m_Socket(io), m_HeartBeatTimer(io), m_Delimiter(), m_HeartBeatTime(10) 
{ 
    m_Delimiter = "\n"; 
    m_HeartbeatEnabled = false; 
    // start heartbeat timer (optional) 
    if(m_HeartBeatEnabled) 
    { 
     m_HeartBeatTimer.expires_from_now(boost::chrono::seconds(m_HeartBeatTime)); 
     m_HeartBeatTimer.async_wait(boost::bind(&TCPClient::DoHeartBeat, this, boost::asio::placeholders::error)); 
    } 
} 

TCPClient::TCPClient(bool enableHeartBeat) 
    : m_IsConnected(false), m_Socket(io_service), m_HeartBeatTimer(io_service), m_Delimiter(), m_HeartBeatTime(10) 
{ 
    m_Delimiter = "\n"; 
    m_HeartbeatEnabled = enableHeartBeat; 
} 

TCPClient::TCPClient::~TCPClient() 
{ 
} 

void TCPClient::Close() 
{ 
    io_service.stop(); 
    m_Socket.close(); 
} 

boost::asio::ip::tcp::socket& TCPClient::Socket() 
{ 
    return m_Socket; 
} 

boost::asio::io_service& TCPClient::Service() 
{ 
    return io_service; 
} 

void TCPClient::ConnectToServer(const std::string &ip, unsigned short port) 
{ 
    try { 
     boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(ip), port); 
     ConnectToServer(endpoint); 
    } 
    catch(const std::exception &e) { 
     std::cout << "Error: " << e.what() << std::endl; 
    } 
} 

void TCPClient::ConnectToServer(const std::string &url, const std::string &protocol) 
{ 
    // You can also explicitly pass a port, like "8080" 
    boost::asio::ip::tcp::resolver::query query(url, protocol); 
    boost::asio::ip::tcp::resolver resolver(io_service); 
    try { 
     boost::asio::ip::tcp::resolver::iterator destination = resolver.resolve(query); 
     boost::asio::ip::tcp::endpoint endpoint; 
     while (destination != boost::asio::ip::tcp::resolver::iterator()) 
      endpoint = *destination++; 

     ConnectToServer(endpoint); 
    } 
    catch(const std::exception &e) { 
     std::cout << "Error: " << e.what() << std::endl; 
    } 
} 

void TCPClient::ConnectToServer(boost::asio::ip::tcp::endpoint& endpoint) 
{ 
    m_Endpoint = endpoint; 

    std::cout << "Trying to connect to port " << endpoint << std::endl; 

    // try to connect, then call handle_connect 
    m_Socket.async_connect(m_Endpoint, 
     boost::bind(&TCPClient::HandleConnect, this, boost::asio::placeholders::error)); 

    //start processing messages 
    io_service.run(); 
} 

void TCPClient::Write(const std::string &msg) 
{ 
    if(!m_IsConnected) return; 
    std::cout << "write: " << msg << std::endl; 
    // safe way to request the client to write a message 
    io_service.post(boost::bind(&TCPClient::DoWrite, this, msg)); 
} 

void TCPClient::StartRead() 
{ 
    if(!m_IsConnected) return; 

    // wait for a message to arrive, then call handle_read 
    boost::asio::async_read_until(m_Socket, m_Buffer, m_Delimiter, 
      boost::bind(&TCPClient::HandleRead, this, boost::asio::placeholders::error)); 
} 


void TCPClient::HandleRead(const boost::system::error_code& error) 
{ 
    if (!error) 
    { 
     std::string msg; 
     std::istream is(&m_Buffer); 
     std::getline(is, msg); 

     if(msg.empty()) return; 

     //cout << "Server message:" << msg << std::endl; 

     // TODO: you could do some message processing here, like breaking it up 
     //  into smaller parts, rejecting unknown messages or handling the message protocol 

     // create signal to notify listeners 
     sMessage(msg); 

     // restart heartbeat timer (optional) 
     if(m_HeartBeatEnabled) 
     { 
      m_HeartBeatTimer.expires_from_now(boost::chrono::seconds(m_HeartBeatTime)); 
      m_HeartBeatTimer.async_wait(boost::bind(&TCPClient::DoHeartBeat, this, boost::asio::placeholders::error)); 
     } 

     // wait for the next message 
     StartRead(); 
    } 
    else 
    { 
     // try to reconnect if external host disconnects 
     if(error.value() != 0) { 
      m_IsConnected = false; 

      // let listeners know 
      sDisconnected(m_Endpoint); 

      // cancel timers 
      m_HeartBeatTimer.cancel(); 
     } 
     //else 
      //do_close(); 
    } 
} 

void TCPClient::HandleWrite(const boost::system::error_code& error) 
{ 
    if(!error) 
    { 
     // write next message 
     m_Messages.pop_front(); 
     if (!m_Messages.empty()) 
     { 
      std::cout << "Client message:" << m_Messages.front() << std::endl; 

      boost::asio::async_write(m_Socket, 
       boost::asio::buffer(m_Messages.front()), 
       boost::bind(&TCPClient::HandleWrite, this, boost::asio::placeholders::error)); 
     } 
     if(m_HeartBeatEnabled) 
     { 
      // restart heartbeat timer (optional) 
      m_HeartBeatTimer.expires_from_now(boost::chrono::seconds(m_HeartBeatTime)); 
      m_HeartBeatTimer.async_wait(boost::bind(&TCPClient::DoHeartBeat, this, boost::asio::placeholders::error)); 
     } 
    } 
    else 
    { 
     std::cout << "HandleWrite Error: " << error << std::endl; 
    } 
} 

void TCPClient::DoWrite(const std::string &msg) 
{ 
    if(!m_IsConnected) return; 

    bool write_in_progress = !m_Messages.empty(); 
    m_Messages.push_back(msg + m_Delimiter); 

    if (!write_in_progress) 
    { 
     std::cout << "Client message2: " << m_Messages.front() << std::endl; 

     boost::asio::async_write(m_Socket, 
      boost::asio::buffer(m_Messages.front()), 
      boost::bind(&TCPClient::HandleWrite, this, boost::asio::placeholders::error)); 
    } 
    else 
    { 
     std::cout << "DoWrite write_in_progress: " << msg << std::endl; 
    } 
} 

void TCPClient::HandleConnect(const boost::system::error_code& error) 
{ 
    if (!error) { 
     // we are connected! 
     m_IsConnected = true; 

     // let listeners know 
     sConnected(m_Endpoint); 

     // start heartbeat timer (optional) 
     if(m_HeartBeatEnabled) 
     { 
      m_HeartBeatTimer.expires_from_now(boost::chrono::seconds(m_HeartBeatTime)); 
      m_HeartBeatTimer.async_wait(boost::bind(&TCPClient::DoHeartBeat, this, boost::asio::placeholders::error)); 
     } 

     // await the first message 
     StartRead(); 
    } 
    else { 
     // there was an error :(
     m_IsConnected = false; 

     std::cout << "Server error:" << error.message() << std::endl; 
    } 
} 

void TCPClient::DoHeartBeat(const boost::system::error_code& error) 
{ 
    // here you can regularly send a message to the server to keep the connection alive, 
    // I usualy send a PING and then the server replies with a PONG 
    if(!error) Write("PING"); 
} 

void TCPClient::SetHeartBeatTime(int time) 
{ 
    m_HeartBeatTime = time; 
    m_HeartBeatEnabled = true; 
    m_HeartBeatTimer.expires_from_now(boost::chrono::seconds(m_HeartBeatTime)); 
    m_HeartBeatTimer.async_wait(boost::bind(&TCPClient::DoHeartBeat, this, boost::asio::placeholders::error)); 
} 

.cpp 파일 내 tcpserver는을 사용하여 연결을 허용

.HPP 파일

class TCPServer 
{ 
    private: 
     boost::asio::io_service io_service; 
     boost::asio::ip::tcp::acceptor m_acceptor; 

    public: 
     TCPServer(int port); 
     ~TCPServer(); 
     void Close(); 
     void StartAccept(); 

    private: 
     void HandleAccept(TCPClient::pointer new_connection, const boost::system::error_code& error); 

    public: 
     boost::signals2::signal<void(const TCPClient::pointer&)> sig_NewClient; 
}; 

의 .cpp 내가 밀어 매우 새로운 오전 (등 정상의 C#, 자바) C++와 너무 많은 일을하지 않는

TCPServer::TCPServer(int port) 
    : m_acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) 
{ 
} 

TCPServer::TCPServer::~TCPServer() 
{ 
} 

void TCPServer::Close() 
{ 
    m_acceptor.close(); 
    io_service.stop(); 
} 

void TCPServer::StartAccept() 
{ 
    TCPClient::pointer new_connection = TCPClient::Create(io_service); 

    m_acceptor.async_accept(new_connection->Socket(), 
     boost::bind(&TCPServer::HandleAccept, this, new_connection, boost::asio::placeholders::error)); 

    io_service.run(); 

    std::cout << "Run ended for server " << std::endl; 
} 

void TCPServer::HandleAccept(TCPClient::pointer new_connection, const boost::system::error_code& error) 
{ 
    if (!error) 
    { 
     sig_NewClient(new_connection); 
     StartAccept(); 
    } 
} 

파일 그래서 내가 근본적인 뭔가를 놓친 거지하지만 난 할 수없는 가정 문제를 찾으십시오.

도우 흐름
서버
tcpserver는
서버 만들기 - PING을받을 때 StartAccept()가 생성되는하여 TcpClient 인스턴스에 StartRead 전화 새로운 연결에

이 때여보세요
올레 쓰기 수신 PONG

Cl ient
연결 서버
에 는
는 PING에게

클라이언트 수신 10 초마다 보내 미세 서버 잘받은 네트워크에 기록하지만 쓰기는 DoWrite 수 있도록 결코 또는 HandleWrite 방법여보세요 보내

추가 정보가 필요하면 알려주십시오.몇 가지 문제가 사전

답변

0

감사합니다, 그들 중 일부는 내가 볼 수 있습니다

  • 당신이 그나마 때문에 io_service::worker 당신의 io_service.run() 활성 핸들러가 없을 때 중지됩니다. 당신의 TCPClient::DoWrite가 호출 될 때, 데이터가 이미 파괴 할 수

  • 하신 post()에 소켓 쓰기위한 작업을 시도 TCPClient::Write하지만 std::string에 대한 참조를 전달합니다.

기본적인 C++ 및 boost :: asio 사용 문제가 있으므로 더 간단한 구현으로 시작할 가치가 있다고 생각합니다.

+0

쓰기 체인에 대한 버퍼 수명이 나에게 안전한 같습니다 대신, 동의 콜 체인을 시작하고 io_service을 실행하지만 비동기 호출 체인 루프의 일부가 아닌 진입 점을 추가하는 것이 좋습니다. 'TCPClient :: Write()'에서, 참조는 값에 의해'boost :: bind()'펑터에 복사 될 것입니다. 'TCPClient :: DoWrite()'CompletionHandler는 참조에 의한 카피를 참조하고 그것을 deque에 복사하여 버퍼에 deque 엘리먼트를 사용하여 async_write() 오퍼레이션을 시작한다. 기본 메모리는 async_write()의 완료 핸들러 ('TCPClient :: HandleWrite()') 내에서 해제된다. –

+0

io_service.run() 전에 호출 된 async_accept() 및 핸들 호출 수 async_accept를 다시 호출하기 때문에 io_service가 항상 작업해야하기 때문에 제 이해가됩니다. bind에 대한 doco를 보면 "bind가 취하는 인수는 반환 된 함수 객체에 의해 내부적으로 복사되고 유지됩니다."라고되어 있습니다. 이것은 내 원래 개체가 범위를 벗어나 바인드에 사용 된 값이 복사본이되고 따라서 여전히 유효하다는 것을 의미한다고해도 어떨까요? – Lepon

0

TCPServer은 현재 같은 io_service 개체의 이벤트 루프를 서비스 스레드 내에서 io_service.run()를 호출 할 때 호출 체인이 정의되지 않은 동작을 초래하는 io_service 요구 사항을 위반하고 동의합니다. documentation 상태 다음 run() 기능은 현재 같은 io_service 개체에 대한 run() 중 하나 run_one(), poll() 또는 poll_one()를 호출 스레드에서 호출해서는 안

. 다음 같은 io_serviceio_service.run()를 호출하는 TCPServer 코드에서

, 요구 사항이 io_service.run() 내에서 스레드에 의해 호출 HandleAccept() 완료 핸들러, StartAccept()를 호출 할 때 위반 :

   .------------------------------------. 
       V         | 
void TCPServer::StartAccept()      | 
{             | 
    m_acceptor.async_accept(..., &HandleAccept); --. | 
    io_service.run();        | | 
}             | | 
       .---------------------------------' | 
       V         | 
void TCPServer::HandleAccept(...)     | 
{             | 
    if (!error)          | 
    {             | 
    StartAccept(); ---------------------------------' 
    } 
} 

것은 해결하려면 이 경우 완료 핸들러 내에서 io_service.run()을 호출하지 마십시오.

void TCPServer::StartAccept() 
{ 
    DoStartAccept(); ---------------------------------. 
    io_service.run();         | 
}             | 
     .---------------------------------------------' 
     |  .------------------------------------. 
     V  V         | 
void TCPServer::DoStartAccept()      | 
{             | 
    m_acceptor.async_accept(..., &HandleAccept); --. | 
}             | | 
       .---------------------------------' | 
       V         | 
void TCPServer::HandleAccept(...)     | 
{             | 
    if (!error)          | 
    {             | 
    DoStartAccept(); -------------------------------' 
    } 
} 
+0

감사합니다. 제안 된대로 코드를 수정했습니다. 그 것이 확실하게 하나의 버그를 고쳤지 만 나는 여전히 쓰기 문제가 있습니다. – Lepon

관련 문제