2017-04-05 6 views
0

std::vector<std::thread> mIOServicePool으로 선언 된 스레드 풀을 사용하는 비동기 boost :: asio TCP 응용 프로그램을 작성했습니다. 이 스레드는 TCP 데이터를 비동기 적으로 읽고 서버에 기록합니다. 다음 코드는 GUI의 시작 누름 단추 이벤트 핸들러에서 가져온 것입니다.완료되지 않은 완료 핸들러가있는 tcp boost asio io_service 다시 시작

// launch multiple asio service threads to 
// handle the protocol instances - effectively 
// thread pooling the ioservice 
//mpIOService->reset(); 
for (auto i=0; i<3; i++) { 
mIOServicePool.emplace_back(
    std::thread([this]() { mpIOService->run(); })); 
} 

코드는 GUI의 MainWindow를 클래스의 멤버로 저장된 mIOServicePool와 Qt를 기반 GUI 응용 프로그램의 일부입니다.

이것은 응용 프로그램을 시작하고 실행 상태로 유지할 때 제대로 작동하지만 백엔드 서버에 대한 연결을 다시 시작하는 동안 문제가 발생하기 시작합니다. 문제는 대부분 GUI 종료 버튼을 눌렀을 때 io_service와 연결된 io_service::work을 재설정 할 때 플러시 된 것으로 생각되는 처리되지 않은 핸들러와 관련이 있습니다. 문제는 메모리 asio :: socket의 스트림 버퍼를 읽는 동안 액세스 위반을 통해 TCP 통신을 시작하려고 할 때 발생합니다. 아래의 스택 추적에서 볼 수 있듯이 읽기 소켓과 관련된 완료 핸들러를 처리하고 있습니다.

app739.exe!boost::asio::basic_streambuf<std::allocator<char> >::commit(unsigned __int64 n) Line 226 C++ 
app739.exe!boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >::operator()(const boost::system::error_code & ec, unsigned __int64 bytes_transferred, int start) Line 624 C++ 
app739.exe!boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>::operator()() Line 129 C++ 
app739.exe!boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, ...) Line 70 C++ 
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > & context) Line 39 C++ 
app739.exe!boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > * this_handler) Line 685 C++ 
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > & context) Line 39 C++ 
app739.exe!boost::asio::detail::win_iocp_socket_recv_op<boost::asio::mutable_buffers_1,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 97 C++ 
app739.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47 C++ 
app739.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406 C++ 
app739.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164 C++ 
app739.exe!boost::asio::io_service::run() Line 59 C++ 
app739.exe!MainWindow::on_pushButtonStart_clicked::__l13::<lambda>() Line 943 C++ 

answer 문제가 io_service.reset()와 함께해야 할 수 있음을 나타냅니다. 액세스 위반시 스택 추적은 스레드가 asio 완료 핸들러를 처리하고 있음을 나타냅니다. 이 문제를 해결하는 열쇠는 io_service.stop()io_service.reset()boost::asio::io_service 개체에 올바르게 시퀀스하는 것입니다. 또한 io_service를 중지하거나 전초 작업 개체를 다시 설정하기 전에 소켓을 다시 설정해야 할 수도 있습니다.

아래 코드는 코드를 디버깅하는 동안 io_service 스레드를 중지하는 방법을 보여줍니다. 모든 스레드가 조인을 완료하는 것을 볼 수 있습니다. 그래서 완료 핸들러가 아웃 건으로 된 이유를 알 수 없습니다.

void 
MainWindow::stopSys() 
{ 
    // make sure that we have no more work keeping services alive 
    mpWork.reset(); 
    // check to see if the protocol threads were started 
    if (mVCDUProtocol) { 
     // terminate protocol thread by setting the shared mShutdown atomic flag 
     mVCDUProtocol->shutdown(); 
     // Once each thread sees the shutdown flag, it will cleanly 
     // terminate so we can call join here to wait for the entire 
     // pool to finish 
     std::for_each(mIOServicePool.begin(), mIOServicePool.end(), 
      [](std::thread& rNext) { 
       rNext.join(); 
      }); 
     mIOServicePool.clear(); 
    } 
} 

아래에 표시된 내 코드는 매우 간단합니다. 그것은 람다 핸들러 내에서 처리되는 비동기 해결을 시작합니다. 거기에서 start_async_ops(endPointIter)을 호출하여 비동기식 connect()을 호출하고이 람다 코드는 서버로부터 데이터를 기다리는 boost::asio::async_read_until()을 수행하는 VCDUProtocol::do_read()을 호출합니다.

void 
VCDUProtocol::prosimAsyncIOThreadFn() 
{ 
    static auto& gLogger = gpLogger->getLoggerRef(
     gUseSysLog ? Logger::LogDest::SysLog : 
     Logger::LogDest::EventLog); 
    try { 
     // convert the host-name/port to a usable endpoint 
     tcp::resolver resolver(*mpIOService); 
     tcp::resolver::query query(mProtocolConfig.getProsimHostName(), 
      std::to_string(mProtocolConfig.getProsimPortNum())); 
     const auto endPointIter = std::find_if(
      resolver.resolve(query), tcp::resolver::iterator(), 
      [](const tcp::endpoint& next) { 
       return next.protocol() == tcp::v4(); 
      }); 
     if (endPointIter != tcp::resolver::iterator()) { 
      mpSocket = std::make_unique<tcp::socket>(*mpIOService); 
      mpSocketTimer = std::make_unique<deadline_timer>(*mpIOService); 
      start_async_ops(endPointIter); 
     } 
    } catch (std::exception& rEx) { 
     LOG_ERROR(gLogger, gChannel) << boost::format(
      "%1%: %2%") 
      % __FUNCTION__ 
      % rEx.what(); 
    } 
} 

void 
VCDUProtocol::start_async_ops(tcp::resolver::iterator endpoint_iter) 
{ 
    // Start the connect actor. 
    do_connect(endpoint_iter); 

    // Start the deadline actor. You will note that we're not setting any 
    // particular deadline here. Instead, the connect and input actors will 
    // update the deadline prior to each asynchronous operation. 
    mpSocketTimer->async_wait(boost::bind(
     &VCDUProtocol::check_deadline, this, _1)); 
} 

void 
VCDUProtocol::do_connect(
    tcp::resolver::iterator endpoint_iter) 
{ 
    if (endpoint_iter != tcp::resolver::iterator()) { 
     // Set a deadline for the connect operation to complete. 
     mpSocketTimer->expires_from_now(boost::posix_time::seconds(5)); 
     boost::asio::async_connect(*mpSocket, endpoint_iter, 
      [this](boost::system::error_code ec, tcp::resolver::iterator) { 
       if (!mShutdownFlag && !ec) { 
        // successfully connected here - cancel the 
        // connect timer and kick off async write ops 
        mpSocketTimer->cancel(); 
        // kick off the prosim read operation 
        do_read(); 
       } 
      }); 
    } else { 
     // No more endpoints. Close the socket. 
     shutdown(); 
    } 
} 

void 
VCDUProtocol::do_read() 
{ 
    // Start or continue an asynchronous line reads. This will read at least 
    // up to a carriage return or line feed 
    async_read_until(*mpSocket, *mTLS->mSocketStreamBuf, "\r\n", 
     boost::bind(&VCDUProtocol::handle_read, this, _1)); 
} 

이것은 비동기 읽기 완료 핸들러 - 이것은 취소 할 필요가 나는 완료 핸들러가 호출되지 않습니다 단순히 소켓을 닫는 것은 불충분 어딘가에 읽었다. 나는 취소를 불러야한다. 되지 않은 완료 핸들러

/** 
* Asynchronous read callback. 
* 
* @param ec  [in] Boost ASIO library error code. 
*/ 
void 
VCDUProtocol::handle_read(const boost::system::error_code& ec) 
{ 
    static auto& gLogger = gpLogger->getLoggerRef(
     gUseSysLog ? Logger::LogDest::SysLog : 
     Logger::LogDest::EventLog); 
    if (!mShutdownFlag) { 
     if (!ec) { 
      // Extract the newline-delimited message from the buffer. 
      std::string line; 
      std::istream is(mTLS->mSocketStreamBuf.get()); 
      while (std::getline(is, line)) { 
       // Critical Section 
       std::lock_guard<std::mutex> lock (gMutexGuard); 
       // handle partial line reads 
       if (is.eof()) { 
        mTLS->mPartialLine = std::move(line); 
        continue; 
       } else if (!mTLS->mPartialLine.empty()) { 
        line = std::move(mTLS->mPartialLine) + line; 
       } 
       . . . 
       // update GUI 
       mpListener->handlePageUpdate(
        mProtocolConfig.getCduID(), 
        mTLS->mVCDUPage, bRefreshCDU); 
       } 
       // not really required 
       line.clear(); 
      } 
      // keep reading 
      do_read(); 
     } else { 
      LOG_ERROR(gLogger, gChannel) << boost::format(
       "CDU_%1%: handle_read - error[%2%]") 
       % mProtocolConfig.getCduID() 
       % ec.message(); 
      shutdown(); 
     } 
    } 
} 

답변

1

다시 시작 TCP 부스트 ASIO의 io_service 불가능

AFAICT.

run()을 다시 호출하기 전에 설명서에 reset()을 호출해야한다고 명시되어 있습니다.

나는 유일한 실용적인 대안이 예를 들어 자신의 이벤트 루프를 만드는 것이라고 생각합니다. poll_one()을 사용하면 처음부터 서비스를 중지하지 않아도됩니다.

이것은 비동기 읽기 완료 핸들러입니다.이 핸들은 취소가 필요합니다. 완성 핸들러가 호출되지 않기 때문에 소켓을 닫는 것만으로는 충분하지 않습니다.

사실이 아닙니다. 소켓을 취소하면 비행 중 작업이 취소되고 이되어 완료 핸들러가 ec == operation_aborted과 함께 호출됩니다. 소켓을 닫으면 bad_socket 같은 다른 오류 코드가 발생할 수 있습니다.

+0

감사합니다. 그러나 이것에 대한 대부분의 시간을 보냈습니다. 해결책을 찾지 못했을 것 같습니다. - 그것도 의미가 있습니다. 기본적으로 트릭이 정상적으로 취소되고, 종료되고, 소켓을 닫은 것처럼 보입니다. 감시병 작업 객체. 이것은 handle_read 핸들러가 오류 (이 경우에는 '스레드 종료 또는 응용 프로그램 요청으로 인해 I/O 작업이 중단되었습니다')로 깨어 난 다음 다시 시작할 수 있음을 지적했습니다. – johnco3

+0

또한 작업 센티넬에서 reset을 호출 한 후 io_service를 재설정하여 완전히 종료하고 (필요하면 스레드 풀이 바로 전에 합류했는지는 확실하지 않음) 다시 시작하기 전에 io_service가 재설정되었는지 확인했습니다 run()에서 즉시 멈추고 종료), io_service를 사용하여 감시 카메라를 다시 초기화하십시오. – johnco3

관련 문제