2013-03-15 2 views
6

내 응용 프로그램에 대한 비동기 부스트 udp/TCP 소켓 작업 대 동기를 사용하여 평가하려고합니다. 내 디자인과 비슷한 예제를 찾으려고했지만 올바른 경로가 아니더라도 비동기 옵셋을 내 디자인에 적용하려고 할 수도 있다는 사실을 알게하는 것을 찾지 못했습니다.부스트 asio - 다른 서버에 여러 클라이언트 연결

여러 개의 (읽기 : 1 ~ 10 사이) 서버에 연결하고 다른 프로토콜을 사용하여 통신하고 싶습니다. 나는이 서버 연결 중 하나에 통신해야하는 데이터를 생성하는 4-5 개의 스레드를 가지고 있습니다.

현재 디자인은 동기식이며 서버 연결 스레드 당 io_service 개체를 사용하고 생성중인 스레드와 각 연결 스레드 사이에 스레드 안전 큐를 사용합니다.

이 디자인은 처리량 성능 측면에서 확장 성이없는 것처럼 보입니다.이 부분을 최대화하고 싶습니다.

이 여러 연결을 다른 서버 패턴에 제공하는 예가 있습니까?

답변

2

ASIO로 구현 된 TCP/IP SSL/TLS를 사용하여 6 개의 다른 서버에 연결하기 위해 클라이언트를 작성했습니다. 모두 6 동일한 프로토콜을 사용합니다.

:

SSLSocket.H

#pragma once 

#include <cstdlib> 
#include <iostream> 
#include <queue> 
#include <boost/bind.hpp> 
#include <boost/asio.hpp> 
#include <boost/asio/ssl.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/mutex.hpp> 
#include <boost/shared_ptr.hpp> 
using namespace std; 
// 
#include "BufferManagement.h" 
#include "Logger.h" 
#include "Common Classes\Locking.h" 
#include "Message.h" 

class SSLSocket; 
class ConcurrentMsgQueue; 

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 

typedef void (__stdcall *Callback)(const SSLSocket* pSSLS, const int bytesInMsg, const void* pBuf); 

// typedef std::vector<boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SocketVectorType; 

enum {MsgLenBytes = 4}; 

class SSLSocket 
{ 
    // This class handles all communications between the client and the server 
    // using TCP/IP SSL v1. The Boost ASIO (Asynchronous I/O) library is used to accomplish this. 
    // Initally written by Bob Bryan on 1/21/2013. 
    // 
public: 
    SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, const LogLevel levelOfLog, const string& logFileName, const int bufMangLen); 
    ~SSLSocket(); 
    void Connect(SSLSocket* psSLS, const string& serverPath, string& port); 
    void SendToServer(const int bytesInMsg, Byte* pBuf); 
    void Stop(); 

    static void SetCallback(Callback callbackFunction) 
    { 
     // This method is required in order to be able to do a reverse pinvoke from C#. 
     // This callback function pointer is what is used to communicate back to the C# code. 
     CallbackFunction = callbackFunction; 
    } 

    static Byte* AllocateMem(int length) 
    { 
     // Allocate some memory. This method winds up getting called when the C# client needs to allocate some memory for a message. 
     Byte* pBuf = BufMang.GetPtr(length); 
     return pBuf; 
    } 
    // 
    static Logger Log; // Object used to log info to a file and/or to the console. 
    static Callback CallbackFunction; // Callback function object used to communicate with the worker thread in C#. 

private: 
    void InitAsynchIO(); 
    void HandleConnect(const boost::system::error_code& error); 
    void HandleHandshake(const boost::system::error_code& error); 
    void HandleFirstWrite(const boost::system::error_code& error, size_t bytes_transferred); 
    void HandleRead(const boost::system::error_code& error, size_t bytesTransferred); 
    // void HandleRead(const boost::system::error_code& error, size_t bytes_transferred); 
    void Terminate(); 
    void static RcvWorkerThread(SSLSocket* sSLS); 
    void static SendWorkerThread(SSLSocket* psSLS); 
    void ProcessSendRequests(); 
    void HandleWrite(const boost::system::error_code& error, size_t bytesTransferred); 
    static void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service); 
    // 
    struct Bytes 
    { 
     // Used to convert 4 bytes to an int. 
     unsigned char B1; 
     unsigned char B2; 
     unsigned char B3; 
     unsigned char B4; 
    }; 

    union Bytes4ToInt 
    { 
     // Converts 4 bytes to an int. 
     int IntVal; 
     Bytes B; 
    }; 

    inline int BytesToInt(const Byte * pBuf) 
    { 
     // This method converts 4 bytes from an array of bytes to a 4-byte int. 
     B2I.B.B1 = *pBuf++; 
     B2I.B.B2 = *pBuf++; 
     B2I.B.B3 = *pBuf++; 
     B2I.B.B4 = *pBuf; 
     int Value = B2I.IntVal; 
     return Value; 
    } 
    // 
    boost::thread_group WorkerThreads; // Used to handle creating threads. 
    CRITICAL_SECTION SocketLock; // Used in conjuction with the Locking object to handle single threading the code. 
    boost::asio::ssl::stream<boost::asio::ip::tcp::socket>* pSocket; // Pointer to the socket object. 
    Bytes4ToInt B2I; // Used to translate 4 bytes in the buffer to an int representing the number of bytes in the msg. 
    std::string sClientIp; // Client IP address. Used for logging. 
    unsigned short uiClientPort; // Port number. Used for logging. 
    // static MessageList* pRepMsgs; // Link list of the msgs to send to the server. 
    Byte* pDataBuf; // Pointer to the data for the current message to be read. 
    static boost::shared_ptr<boost::asio::io_service> IOService; // Object required for use by ASIO to perform certain functions. 
    static bool RcvThreadCreated; // Set when the rcv thread is created so that it won't try to create it again. 
    static int StaticInit; // Indicates whether or not the static members have been initialized or not. 
    static bool DisplayInHex; // Specifies whether to display a buffer in hex or not. 
    static BufferManagement BufMang; // Smart pointer to the buffer used to handle requests coming to and from the server for all sockets. 
    volatile static bool ReqAlive; // Used to indicate whether the request thread should die or not. 
    // static bool RepAlive; // Used to indicate whether the response thread should die or not. 
    static ConcurrentMsgQueue SendMsgQ; // Holds the messages waiting to be sent to the server. 
    static HANDLE hEvent; // Used for signalling between threads. 
}; 

그래서, 여기에 핵심 포인트입니다

#include "StdAfx.h" 
#include "SSLSocket.h" 

boost::shared_ptr<boost::asio::io_service> SSLSocket::IOService; 
int SSLSocket::StaticInit = 0; 
Callback SSLSocket::CallbackFunction; 
BufferManagement SSLSocket::BufMang; 
volatile bool SSLSocket::ReqAlive = true; 
Logger SSLSocket::Log; 
HANDLE SSLSocket::hEvent; 
bool SSLSocket::DisplayInHex; 
ConcurrentMsgQueue SSLSocket::SendMsgQ; 
bool SSLSocket::RcvThreadCreated = 0; 
BufferManagement* Message::pBufMang; 

SSLSocket::SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, 
    const LogLevel levelOfLog, const string& logFileName, const int bufMangLen) : pSocket(0) 
{ 
    // SSLSocket Constructor. 
    // If the static members have not been intialized yet, then initialize them. 
    if (!StaticInit) 
    { 
     DisplayInHex = displayInHex; 
     BufMang.Init(bufMangLen); 
     Message::SetBufMang(&BufMang); 
     // This constructor enables logging according to the vars passed in. 
     Log.Init(logToFile, logToConsole, levelOfLog, logFileName); 
     // Create the crit section object 
     // Locking::InitLocking(ReadLock); 
     // Locking::InitLocking(WriteLock); 
     StaticInit++; 
     hEvent = CreateEvent(NULL, false, false, NULL); 
     // Define the ASIO IO service object. 
     // IOService = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service); 
     boost::shared_ptr<boost::asio::io_service> IOServ(new boost::asio::io_service); 
     IOService = IOServ; 
    } 
} 

SSLSocket::~SSLSocket(void) 
{ 
    delete pSocket; 
    if (--StaticInit == 0) 
     CloseHandle(hEvent); 
} 

void SSLSocket::Connect(SSLSocket* psSLS, const string& serverPath, string& port) 
{ 
    // Connects to the server. 
    // serverPath - specifies the path to the server. Can be either an ip address or url. 
    // port - port server is listening on. 
    // 
    try 
    { 
     Locking CodeLock(SocketLock); // Single thread the code. 
     // If the user has tried to connect before, then make sure everything is clean before trying to do so again. 
     if (pSocket) 
     { 
     delete pSocket; 
     pSocket = 0; 
     }                         
     // If serverPath is a URL, then resolve the address. 
     // Note that this code expects the first server to always have a url. 
     if ((serverPath[0] < '0') || (serverPath[0] > '9')) // Assumes that the first char of the server path is not a number when resolving to an ip addr. 
     { 
     // Create the resolver and query objects to resolve the host name in serverPath to an ip address. 
     boost::asio::ip::tcp::resolver resolver(*IOService); 
     boost::asio::ip::tcp::resolver::query query(serverPath, port); 
     boost::asio::ip::tcp::resolver::iterator EndpointIterator = resolver.resolve(query); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // The thread we are on now, is most likely the user interface thread. Create a thread to handle all incoming socket work messages. 
     if (!RcvThreadCreated) 
     { 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::RcvWorkerThread, this)); 
      RcvThreadCreated = true; 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread, this)); 
     } 
     // Try to connect to the server. Note - add timeout logic at some point. 
     boost::asio::async_connect(pSocket->lowest_layer(), EndpointIterator, 
      boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
     else 
     { 
     // serverPath is an ip address, so try to connect using that. 
     // 
     // Create an endpoint with the specified ip address. 
     const boost::asio::ip::address IP(boost::asio::ip::address::from_string(serverPath)); 
     int iport = atoi(port.c_str()); 
     const boost::asio::ip::tcp::endpoint EP(IP, iport); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // Try to connect to the server. Note - add timeout logic at some point. 
     //pSocket->core_.engine_.do_connect(void*, int); 
     // pSocket->next_layer_.async_connect(EP, &SSLSocket::HandleConnect) 
     // pSocket->next_layer().async_connect(EP, &SSLSocket::HandleConnect); 
     boost::system::error_code EC; 
     pSocket->next_layer().connect(EP, EC); 
     if (EC) 
     { 
      // Log an error. This worker thread should exit gracefully after this. 
      stringstream ss; 
      ss << "SSLSocket::Connect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << EC.message() + ".\n"; 
      Log.LogString(ss.str(), LogError); 
     } 
     HandleConnect(EC); 
     // boost::asio::async_connect(pSocket->lowest_layer(), EP, 
     // boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::Connect: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf) 
{ 
    // This method creates a msg object and saves it in the SendMsgQ object. 
    // sends the number of bytes specified by bytesInMsg in pBuf to the server. 
    // 
    Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf); 
    SendMsgQ.Push(pMsg); 
    // Signal the send worker thread to wake up and send the msg to the server. 
    SetEvent(hEvent); 
} 


void SSLSocket::SendWorkerThread(SSLSocket* psSLS) 
{ 
    // This thread method that gets called to process the messages to be sent to the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->ProcessSendRequests(); 
} 

void SSLSocket::ProcessSendRequests() 
{ 
    // This method handles sending msgs to the server. 
    // 
    std::stringstream ss; 
    DWORD WaitResult; 
    Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo); 
    // Loop until the user quits, or an error of some sort is thrown. 
    try 
    { 
     do 
     { 
     // If there are one or more msgs that need to be sent to a server, then send them out. 
     if (SendMsgQ.Count() > 0) 
     { 
      Message* pMsg = SendMsgQ.Front(); 
      SSLSocket* pSSL = pMsg->pSSL; 
      SendMsgQ.Pop(); 
      const Byte* pBuf = pMsg->pBuf; 
      const int BytesInMsg = pMsg->BytesInMsg; 
      boost::system::error_code Error; 
      { 
       Locking CodeLock(SocketLock); // Single thread the code. 
       boost::asio::async_write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), boost::bind(&SSLSocket::HandleWrite, this, 
        boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
      } 
      ss << "SSLSocket::ProcessSendRequests: # bytes sent = " << BytesInMsg << "\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3); 
     } 
     else 
     { 
      // Nothing to send, so go into a wait state. 
      WaitResult = WaitForSingleObject(hEvent, INFINITE); 
      if (WaitResult != 0L) 
      { 
       Log.LogString("SSLSocket::ProcessSendRequests: WaitForSingleObject event error. Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError); 
      } 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. Nothing to do really since reading is handled by the HandleRead method. 

    std::stringstream ss; 
    try 
    { 
     if (error) 
     { 
     ss << "SSLSocket::HandleWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::RcvWorkerThread(SSLSocket* psSLS) 
{ 
    // This is the method that gets called when the receive thread is created by this class. 
    // This thread method focuses on processing messages received from the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->InitAsynchIO(); 
} 

void SSLSocket::InitAsynchIO() 
{ 
    // This method is responsible for initiating asynch i/o. 
    boost::system::error_code Err; 
    string s; 
    stringstream ss; 
    // 
    try 
    { 
     ss << "SSLSocket::InitAsynchIO: Worker thread - " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Enable the handlers for asynch i/o. The thread will hang here until the stop method has been called or an error occurs. 
     // Add a work object so the thread will be dedicated to handling asynch i/o. 
     boost::asio::io_service::work work(*IOService); 
     IOService->run(); 
     Log.LogString("SSLSocket::InitAsynchIO: receive worker thread done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleConnect(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the connect request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     pSocket->async_handshake(boost::asio::ssl::stream_base::client, 
      boost::bind(&SSLSocket::HandleHandshake, this, boost::asio::placeholders::error)); 
     ss << "SSLSocket::HandleConnect: From worker thread " << Logger::NumberToString(boost::this_thread::get_id()) << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     } 
     else 
     { 
     // Log an error. This worker thread should exit gracefully after this. 
     ss << "SSLSocket::HandleConnect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << error.message() + ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleHandshake(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the handshake request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // Try to send the first message that the server is expecting. This msg tells the server we want to start communicating. 
     // This is the only msg specified in the C++ code. All other msg processing is done in the C# code. 
     // 
     unsigned char Msg[27] = {0x17, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x41, 
      0x74, 0x74, 0x61, 0x63, 0x6b, 0x50, 0x6f, 0x6b, 0x65, 0x72, 0x02, 0x00, 0x65, 0x6e}; 
     boost::system::error_code Err; 

     sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string(); 
     uiClientPort = pSocket->lowest_layer().remote_endpoint().port(); 
     ReqAlive = true; 
     // boost::asio::async_write(*pSocket, boost::asio::buffer(Msg), boost::bind(&SSLSocket::HandleFirstWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     int Count = boost::asio::write(*pSocket, boost::asio::buffer(Msg), boost::asio::transfer_exactly(27), Err); 
     if (Err) 
     { 
      ss << "SSLSocket::HandleHandshake: write failed - " << error.message() << ".\n"; 
      Log.LogString(ss.str(), LogInfo); 
     } 
     HandleFirstWrite(Err, Count); 
     // boost::asio::async_write(pSocket, boost::asio::buffer(Msg, 27), boost::bind(&SSLSocket::HandleWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     ss.str(""); 
     ss << "SSLSocket::HandleHandshake: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     } 
     else 
     { 
     ss << "SSLSocket::HandleHandshake: failed - " << error.message() << ".\n"; 
     IOService->stop(); 
     } 
     Log.LogString(ss.str(), LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleFirstWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // boost::asio::async_read(pSocket, boost::asio::buffer(reply_, bytesTransferred), boost::bind(&SSLSocket::handle_read, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::async_read(pSocket, boost::asio::buffer(reply_, 84), boost::bind(&SSLSocket::handle_read, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // Locking CodeLock(ReadLock); // Single thread the code. 
     // Signal the other threads that msgs are now ready to be sent and received. 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pRepBuf), boost::asio::transfer_exactly(4), boost::bind(&SSLSocket::HandleRead, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // 
     // Notify the UI that we are now connected. Create a 6 byte msg for this. 
     pDataBuf = BufMang.GetPtr(6); 
     BYTE* p = pDataBuf; 
     // Create msg type 500 
     *p = 244; 
     *++p = 1; 
     CallbackFunction(this, 2, (void*)pDataBuf); 
     // Get the 1st 4 bytes of the next msg, which is always the length of the that msg. 
     pDataBuf = BufMang.GetPtr(MsgLenBytes); 

     // int i1=1,i2=2,i3=3,i4=4,i5=5,i6=6,i7=7,i8=8,i9=9; 
     // (boost::bind(&nine_arguments,_9,_2,_1,_6,_3,_8,_4,_5,_7)) 
     //  (i1,i2,i3,i4,i5,i6,i7,i8,i9); 

     // boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::asio::transfer_exactly(MsgLenBytes), Err); 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, _1,_2,_3)) 
     // (this, pReqBuf, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred); 
     // boost::asio::async_read(*pSocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(ByteCount), boost::bind(&Client::handle_read, 
     //  this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::async_write(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 

     Locking CodeLock(SocketLock); // Single thread the code. 
     boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, this, 
      boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     } 
     else 
     { 
     ss << "SSLSocket::HandleFirstWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleRead(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called to process an incomming message. 
    // 
    std::stringstream ss; 
    int ByteCount; 
    try 
    { 
     ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Set to exit this thread if the user is done. 
     if (!ReqAlive) 
     { 
     // IOService->stop(); 
     return; 
     } 
     if (!error) 
     { 
     // Get the number of bytes in the message. 
     if (bytesTransferred == 4) 
     { 
      ByteCount = BytesToInt(pDataBuf); 
     } 
     else 
     { 
      // Call the C# callback method that will handle the message. 
      ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytesTransferred << ".\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pDataBuf, (int)bytesTransferred, true, LogDebug3); 
      Log.LogString("SSLSocket::HandleRead: sending msg to the C# client.\n\n", LogDebug2); 
      CallbackFunction(this, bytesTransferred, (void*)pDataBuf); 
      // Prepare to read in the next message length. 
      ByteCount = MsgLenBytes; 
     } 
     pDataBuf = BufMang.GetPtr(ByteCount); 
     boost::system::error_code Err; 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
      // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     Locking CodeLock(SocketLock); // Single thread the code. 
     boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
      this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::read(pSocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(ByteCount), Err); 
     } 
     else 
     { 
     Log.LogString("SSLSocket::HandleRead failed: " + error.message() + "\n", LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleRead: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::Stop() 
{ 
    // This method calls the shutdown method on the socket in order to stop reads or writes that might be going on. If this is not done, then an exception will be thrown 
    // when it comes time to delete this object. 
    ReqAlive = false; 
    SetEvent(hEvent); 
    IOService->stop(); 
} 

SSLSocket.cpp : 도움이된다면 그래서, 여기 내 코드입니다

  1. ser에 연결할 때 ver를 처음으로 사용하면 SSLSocket 클래스의 새 인스턴스가 만들어집니다. io_service 객체는 정적이며 한 번만 생성됩니다. 이 클래스는, SSLSocket 클래스의 6 개의 인스턴스 모두로 사용됩니다.

  2. 6 개의 서버 모두에서 소켓 통신과 관련된 모든 작업에 2 개의 스레드가 사용됩니다. 하나의 스레드는 서버로부터 수신 된 메시지를 처리하기위한 것입니다. 다른 스레드는 서버에 메시지를 보내는 데 사용됩니다.

  3. 이 코드는 SSL/TSL을 사용합니다. 스트레이트 TCP를 사용하고있는 경우는, SSLSocket :: Connect 메소드 및 ssl #include 행의 3 행을 간단히 삭제할 수 있습니다.

  4. HandleRead에서 사용되는 기술은 이중 읽기 방법을 사용합니다. 첫 번째 읽기는 바이트 수를 가져오고 (프로토콜은 처음 4 바이트를 메시지 길이로 사용하므로) 두 번째 바이트는 해당 메시지의 총 바이트 수를 가져옵니다. 이것은 소켓에서 데이터를 읽는 것을 처리하는 가장 효율적이거나 심지어 가장 바람직한 방법이 아닐 수도 있습니다. 그러나 이해하는 것이 가장 쉽고 간단합니다. 프로토콜이 다르거 나 메시지 크기가 훨씬 크고 전체 메시지를 받기 전에 메시지 처리를 시작할 수있는 경우 다른 접근 방법을 사용하는 것이 좋습니다.

  5. 이 코드는 Visual Studio 2008 for Windows에서 Boost 1.52.0을 사용합니다.

+0

감사합니다. Sam이 언급했듯이, 더 많은 연결이있을 때까지는 비동기 경로로가는 것이 효과적이지 않을 수도 있음을 언급하면서 감사의 말을 전합니다. 나는 이것을 지금 내 주머니에 보관할 것이다. – RishiD

+0

이 코드는 한눈에 'Locking CodeLock (SocketLock); boost :: asio :: async_write (...);'? 주석은 이것이 단일 스레드 여야 함을 나타내는 것 같습니다. 뮤텍스 (mutex)는 애플리케이션이 스트림 당 최대 하나의 쓰기 작업을 수행해야하기 때문에 충분하지 않습니다. –

+0

내가 처음에 이런 문제를 겪었을 때 얼마나 많은 쓰레드가 쓰일지 확신하지 못했고, 다중 소켓 쓰레드가 같은 소켓 객체를 사용하려고 할 수도 있다고 걱정했다. 나는 나중에 읽기를 위해 하나의 스레드를 사용하고 쓰기를 위해 하나의 스레드를 사용하도록 코드를 수정했지만 잠금 장치에는 남아있었습니다. 나는 그들을 데리고 나가야한다. "스트림 하나당 최대 하나의 쓰기 작업을 보장해야하기 때문에 뮤텍스가 충분하지 않습니다."라는 말은 무엇을 의미합니까? ? 대신 당신은 무엇을합니까? HTTP Server 3 예제를 보셨습니까? –

1

Asio examples에 포함 된 일대 다 클라이언트 - 서버 디자인의 직접적인 예는 없습니다. 디자인이 최대 10 개의 연결로 고정되어 있다면 각 스레드에 대해 동기식 통신을 사용하는 것이 좋습니다. 그러나 이것을 이보다 훨씬 더 확장하려는 경우, 수 백 또는 수천 개의 스레드를 생성함으로써 점차 수익이 감소하는 것을 볼 수 있습니다.

즉, async_connectasync_readasync_write과 결합하여 사용하는 것이 이해하기 쉽거나 구현하기가 어렵지 않습니다. 나는 동일한 개념을 사용하여 소수의 스레드만을 사용하여 world's fastest supercomputer에서 수천 개의 연결을 관리했습니다. async TCP client example은 아마도이 경로를 선택하면 공부하는 것이 가장 좋습니다.

예 이외의 다른 것을 찾고 계신다면 유용하다고 생각되는 Asio를 사용하는 사람이 많습니다. open source projects

+0

입력 해 주셔서 감사합니다. 아직 비동기 통신 주위에 두뇌를 감싸고 있습니다. 나는 서버 측의 값을 이해하지만 클라이언트 측에서 그 값을 볼 수 없었다. – RishiD