일반적으로 내가 본 것처럼 common way to create thread pools via "io_service + thread_group"입니다. const 크기의 스레드 풀에 정말 좋습니다. 또는 단지 더 커질 수있는 수영장. 그러나 나는 모든 수영장을 멈추게하면서 어떻게 수영장을 더 작게 만드는지 궁금합니다.어떻게 boost :: thread_group을 작게 만들려면 boost :: asio :: io_service ::가 스레드에서 실행 되나요?
그래서 우리는 shown
// class variables
asio::io_service io_service;
boost::thread_group threads;
asio::io_service::work *work;
// some pool init function
work = new asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < cores_number; ++i)
threads.create_thread(boost::bind(&asio::io_service::run, &io_service));
// and now we can simply post tasks
io_service.post(boost::bind(&class_name::an_expensive_calculation, this, 42));
io_service.post(boost::bind(&class_name::a_long_running_task, this, 123));
// and it is really eazy to make pool biger - just call (mutexes may be required)
threads.create_thread(boost::bind(&asio::io_service::run, &io_service));
으로해야하지만 우리는 우리의 스레드 풀에서 스레드를 제거하기 위해 무엇을해야할까요? 우리는 단순히 threads.remove_thread(thread* thrd);
을 호출 할 수 없기 때문에 &asio::io_service::run
(IMHO)에서 실행을 멈추지 않을 것입니다. 그래서 궁금합니다. 가능하고 실제로 그러한 풀에서 스레드를 제거하는 방법이 있습니까? (tham을 방해하지 않고 현재 스레드 작업이 범위를 벗어날 때까지 기다리는 것)?
업데이트 : 스레드 원하는 삶의 시간, 스레드 풀 : 여기
몇 가지 간단한 컴파일 가능한 코드입니다. 당신이 스레드가 스레드에서 제거되지 않습니다 볼 수 있듯이
#include <stdio.h>
#include <iostream>
#include <fstream>
//Boost
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>
boost::asio::io_service io_service;
boost::asio::io_service::work *work;
boost::thread_group threads;
boost::mutex threads_creation;
int time_limit;
int calculate_the_answer_to_life_the_universe_and_everything(int i)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(i));
std::cout << i << std::endl;
return i;
}
void run(boost::shared_ptr<boost::thread> thread_ptr)
{
try
{
io_service.run();
}
catch(std::exception &e)
{
std::cout << "exeption: " << e.what() << std::endl;
boost::mutex::scoped_lock lock(threads_creation);
threads.remove_thread(thread_ptr.get());
lock.unlock();
std::cout << "thread removed from group" << std::endl;
return;
}
}
void pool_item(int i)
{
boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i));
boost::unique_future<int> fi=pt.get_future();
boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread
if(fi.timed_wait(boost::posix_time::milliseconds(time_limit)))
{
std::cout << "sucsess function returned: " << fi.get() << std::endl;
}
else
{
std::cout << "request took way 2 long!" << std::endl;
std::cout << "current group size:" << threads.size() << std::endl;
boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt)));
boost::mutex::scoped_lock lock(threads_creation);
threads.add_thread(thread.get());
lock.unlock();
task->join();
throw std::runtime_error("killed joined thread");
}
}
int main()
{
time_limit = 500;
work = new boost::asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < cores_number; ++i)
{
boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt)));
threads.add_thread(thread.get());
}
int i = 800;
io_service.post(boost::bind(pool_item, i));
boost::this_thread::sleep(boost::posix_time::milliseconds(i*2));
std::cout << "thread should be removed by now." << std::endl
<< "group size:" << threads.size() << std::endl;
std::cin.get();
return 0;
}
도 .remove_thread(ptr);
호출 후 풀 = (왜
업데이트 # 2 :.? 내가 함께 결국
그럼 어떤 식 으로든 의상 스레드 그룹 ...
#include <stdio.h>
#include <iostream>
#include <fstream>
#include <set>
//Boost
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>
//cf service interface
//#include <service.hpp>
//cf-server
//#include <server.h>
#include <boost/foreach.hpp>
class thread_group
{
public:
void add(boost::shared_ptr<boost::thread> to_add)
{
boost::mutex::scoped_lock lock(m);
ds_.insert(to_add);
}
void remove(boost::shared_ptr<boost::thread> to_remove)
{
boost::mutex::scoped_lock lock(m);
ds_.erase(to_remove);
}
int size()
{
boost::mutex::scoped_lock lock(m);
return ds_.size();
}
void join_all(boost::posix_time::milliseconds interuption_time=boost::posix_time::milliseconds(1000))
{
boost::mutex::scoped_lock lock(m);
BOOST_FOREACH(boost::shared_ptr<boost::thread> t, ds_)
{
boost::thread interrupter(boost::bind(&thread_group::interupt_thread, this, t, interuption_time));
}
}
private:
std::set< boost::shared_ptr<boost::thread> > ds_;
boost::mutex m;
void interupt_thread(boost::shared_ptr<boost::thread> t, boost::posix_time::milliseconds interuption_time)
{
try
{
if(!t->timed_join(interuption_time))
t->interrupt();
}
catch(std::exception &e)
{
}
}
};
boost::asio::io_service io_service;
boost::asio::io_service::work *work;
thread_group threads;
int time_limit;
int calculate_the_answer_to_life_the_universe_and_everything(int i)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(i));
std::cout << i << std::endl;
return i;
}
void run(boost::shared_ptr<boost::thread> thread_ptr)
{
try
{
io_service.run();
}
catch(std::exception &e)
{
std::cout << "exeption: " << e.what() << std::endl;
threads.remove(thread_ptr);
std::cout << "thread removed from group" << std::endl;
return;
}
}
void pool_item(int i)
{
boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i));
boost::unique_future<int> fi=pt.get_future();
boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread
if(fi.timed_wait(boost::posix_time::milliseconds(time_limit)))
{
std::cout << "sucsess function returned: " << fi.get() << std::endl;
}
else
{
std::cout << "request took way 2 long!" << std::endl;
std::cout << "current group size:" << threads.size() << std::endl;
std::cout << "we want to add thread!" << std::endl;
boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
threads.add(thread);
std::cout << "thread added" << std::endl
<< "current group size:" << threads.size() << std::endl;
task->join();
throw std::runtime_error("killed joined thread");
}
}
int main()
{
time_limit = 500;
work = new boost::asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < cores_number; ++i)
{
boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt)));
threads.add(thread);
}
int i = 800;
io_service.post(boost::bind(pool_item, i));
boost::this_thread::sleep(boost::posix_time::milliseconds(i*2));
std::cout << "thread should be removed by now." << std::endl
<< "group size:" << threads.size() << std::endl;
std::cin.get();
return 0;
}
내 게시물 업데이트를 참조하십시오. – Rella