내가 부스트를 사용하고 :: ASIO 및 부스트 : 스레드을 메시지를 수락 메시지 서비스를 실현하기 위해 보내 그들 비동기 어떤 메시지가없는 경우 처리 중이거나 대기열 메시지가 처리되고있는 경우.잘못 사용 :: ASIO 및 부스트 : 스레드
메시지의 속도가 내 머리에 높습니다. 약 초당 2.000 메시지. 너무 많은 메시지로 나는 아주 드문 메시지에 직면한다. 2.000 메시지에서 약 4-8이 손상되었습니다. 문제는 boost :: asio 및/또는 boost :: thread 라이브러리의 잘못된 사용 때문이라고 생각합니다.
구현 한 코드는 주로 this boost tutorial을 기반으로합니다. 나는 실수를 발견 할 수 없으며 주요 메시지가 나오기 때문에 문제의 범위를 좁히기가 어렵다.
혹시 다른 사람이 무슨 일이 벌어지고 있는지 생각해 보셨나요? 기본적으로이 클래스는 다음과 같은 방법으로 사용된다
:
(1) 생성자는 메시지
을 승인하고 전송 스레드, 따라서 서비스를 시작하기 위해 내 프로그램의 시작 부분에서 호출 (2) 메시지를 전송할 때마다 MessageService::transmitMessage()
으로 전화를 걸어 async_write
이라는 작업을 메시지 큐를 처리하는 스레드에 위임합니다.
using namespace google::protobuf::io;
using boost::asio::ip::tcp;
MessageService::MessageService(std::string ip, std::string port) :
work(io_service), resolver(io_service), socket(io_service) {
messageQueue = new std::deque<AgentMessage>;
tcp::resolver::query query(ip, port);
endpoint_iterator = resolver.resolve(query);
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
this, boost::asio::placeholders::error, ++endpoint_iterator));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}
void MessageService::await() {
while (!messageQueue->empty()) {
signal(SIGINT, exit);
int messagesLeft = messageQueue->size();
sleep(3);
std::cout << "Pending Profiler Agents Messages: "
<< messageQueue->size() << std::endl;
if (messagesLeft == messageQueue->size()) {
std::cout << "Connection Error" << std::endl;
break;
}
}
std::cout << i << std::endl;
}
void MessageService::write(AgentMessage agentMessage, long systemTime,
int JVM_ID) {
agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
agentMessage.set_jvm_id(JVM_ID);
agentMessage.set_systemtime(systemTime);
io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}
void MessageService::do_close() {
socket.close();
}
void MessageService::transmitMessage(AgentMessage agentMessage) {
++i;
boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this,
boost::asio::placeholders::error));
}
void MessageService::do_write(AgentMessage agentMessage) {
bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);
if (!write_in_progress) {
transmitMessage(agentMessage);
}
}
void MessageService::handle_write(const boost::system::error_code &error) {
if (!error) {
messageQueue->pop_front();
if (!messageQueue->empty()) {
transmitMessage(messageQueue->front());
}
} else {
std::cout << error << std::endl;
do_close();
}
}
void MessageService::handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler interface
}
MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}
헤더 파일 :이 방법은 나에게 의심스러운 것 같다
using boost::asio::ip::tcp;
class MessageService {
public:
MessageService(std::string ip, std::string port);
virtual ~MessageService();
void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
void await();
private:
boost::asio::io_service io_service;
boost::asio::io_service::work work;
tcp::resolver resolver;
tcp::resolver::iterator endpoint_iterator;
tcp::socket socket;
std::deque<AgentMessage> *messageQueue;
void do_write(AgentMessage agentMessage);
void do_close();
void handle_write(const boost::system::error_code &error);
void handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator);
void transmitMessage(AgentMessage agentMessage);
};
'boost :: thread'에 대해 언급 했으므로 프로그램에서 둘 이상의 스레드를 사용하고 있다고 가정 할 수 있지만 메시지 대기열에 대한 액세스를 보장하기 위해 잠글 필요가 없습니다 (유일한 공유라고 가정합니다). 리소스)는 안전합니다 - 코드를 통과하지 못했음을 인정해야합니다. 큐에 대한 액세스가 단일 스레드 인 경우 ... –
메시지와 큐를 처리하는 스레드가 하나만 있다고 생각합니다. 유일한 함수로, 적어도 내가 생각하기에 이것은 다중 스레드에 의해 호출되는이 방법이다. io_service.post (boost :: bind (& MessageService :: do_write, this, agentMessage)); 내가 생각한 것은 생성자에서 시작된 하나의 스레드에 의해서만 실행됩니다. 하지만 어쩌면 내가 틀렸어. –
'transmitMessage'가 다중 스레드에 의해 호출되지 않으면, io 서비스에서 단일 스레드 만 전달됩니다. 위의 코드에서 아무 것도 잘못 볼 수 없습니다 (다른 스레드에서'write' 만 호출하면됩니다 - 메인 스레드라고 말하면됩니다) - 나는 리시버 코드를 검사 할 것입니다. – Nim