2011-08-28 2 views
1

일반적으로 내가 본 것처럼 common way to create thread pools via "io_service + thread_group"입니다. const 크기의 스레드 풀에 정말 좋습니다. 또는 단지 더 커질 수있는 수영장. 그러나 나는 모든 수영장을 멈추게하면서 어떻게 수영장을 더 작게 만드는지 궁금합니다.어떻게 boost :: thread_group을 작게 만들려면 boost :: asio :: io_service ::가 스레드에서 실행 되나요?

그래서 우리는 shown

// class variables 
asio::io_service io_service; 
boost::thread_group threads; 
asio::io_service::work *work; 

// some pool init function 
work = new asio::io_service::work(io_service); 
int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

// and now we can simply post tasks 
io_service.post(boost::bind(&class_name::an_expensive_calculation, this, 42)); 
io_service.post(boost::bind(&class_name::a_long_running_task, this, 123)); 

// and it is really eazy to make pool biger - just call (mutexes may be required) 
threads.create_thread(boost::bind(&asio::io_service::run, &io_service)); 

으로해야하지만 우리는 우리의 스레드 풀에서 스레드를 제거하기 위해 무엇을해야할까요? 우리는 단순히 threads.remove_thread(thread* thrd);을 호출 할 수 없기 때문에 &asio::io_service::run (IMHO)에서 실행을 멈추지 않을 것입니다. 그래서 궁금합니다. 가능하고 실제로 그러한 풀에서 스레드를 제거하는 방법이 있습니까? (tham을 방해하지 않고 현재 스레드 작업이 범위를 벗어날 때까지 기다리는 것)?

업데이트 : 스레드 원하는 삶의 시간, 스레드 풀 : 여기

몇 가지 간단한 컴파일 가능한 코드입니다. 당신이 스레드가 스레드에서 제거되지 않습니다 볼 수 있듯이

#include <stdio.h> 
#include <iostream> 
#include <fstream> 

//Boost 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/locks.hpp> 

boost::asio::io_service io_service; 
boost::asio::io_service::work *work; 
boost::thread_group threads; 
boost::mutex threads_creation; 
int time_limit; 

int calculate_the_answer_to_life_the_universe_and_everything(int i) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(i)); 
    std::cout << i << std::endl; 
    return i; 
} 

void run(boost::shared_ptr<boost::thread> thread_ptr) 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception &e) 
    { 
     std::cout << "exeption: " << e.what() << std::endl; 
     boost::mutex::scoped_lock lock(threads_creation); 
     threads.remove_thread(thread_ptr.get()); 
     lock.unlock(); 
     std::cout << "thread removed from group" << std::endl; 
     return; 
    } 

} 

void pool_item(int i) 
{ 
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i)); 
    boost::unique_future<int> fi=pt.get_future(); 

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread 

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit))) 
    { 
     std::cout << "sucsess function returned: " << fi.get() << std::endl; 
    } 
    else 
    { 
     std::cout << "request took way 2 long!" << std::endl; 

     std::cout << "current group size:" << threads.size() << std::endl; 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 

     boost::mutex::scoped_lock lock(threads_creation); 
     threads.add_thread(thread.get()); 
     lock.unlock(); 

     task->join(); 

     throw std::runtime_error("killed joined thread"); 

    } 
} 

int main() 
{ 
    time_limit = 500; 

    work = new boost::asio::io_service::work(io_service); 
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    { 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 
     threads.add_thread(thread.get()); 
    } 

    int i = 800; 
    io_service.post(boost::bind(pool_item, i)); 

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2)); 
    std::cout << "thread should be removed by now." << std::endl 
     << "group size:" << threads.size() << std::endl; 

    std::cin.get(); 
    return 0; 
} 

.remove_thread(ptr); 호출 후 풀 = (왜

업데이트 # 2 :.? 내가 함께 결국

그럼 어떤 식 으로든 의상 스레드 그룹 ...

#include <stdio.h> 
#include <iostream> 
#include <fstream> 
#include <set> 

//Boost 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/locks.hpp> 

//cf service interface 
//#include <service.hpp> 

//cf-server 
//#include <server.h> 

#include <boost/foreach.hpp> 

class thread_group 
{ 
public: 
    void add(boost::shared_ptr<boost::thread> to_add) 
    { 
     boost::mutex::scoped_lock lock(m); 
     ds_.insert(to_add); 
    } 
    void remove(boost::shared_ptr<boost::thread> to_remove) 
    { 
     boost::mutex::scoped_lock lock(m); 
     ds_.erase(to_remove); 
    } 

    int size() 
    { 
     boost::mutex::scoped_lock lock(m); 
     return ds_.size(); 
    } 

    void join_all(boost::posix_time::milliseconds interuption_time=boost::posix_time::milliseconds(1000)) 
    { 
     boost::mutex::scoped_lock lock(m); 
     BOOST_FOREACH(boost::shared_ptr<boost::thread> t, ds_) 
     { 
      boost::thread interrupter(boost::bind(&thread_group::interupt_thread, this, t, interuption_time)); 
     } 
    } 

private: 
    std::set< boost::shared_ptr<boost::thread> > ds_; 
    boost::mutex m; 
    void interupt_thread(boost::shared_ptr<boost::thread> t, boost::posix_time::milliseconds interuption_time) 
    { 
     try 
     { 
      if(!t->timed_join(interuption_time)) 
       t->interrupt(); 

     } 
     catch(std::exception &e) 
     { 
     } 
    } 
}; 

boost::asio::io_service io_service; 
boost::asio::io_service::work *work; 
thread_group threads; 
int time_limit; 



int calculate_the_answer_to_life_the_universe_and_everything(int i) 
{ 
    boost::this_thread::sleep(boost::posix_time::milliseconds(i)); 
    std::cout << i << std::endl; 
    return i; 
} 

void run(boost::shared_ptr<boost::thread> thread_ptr) 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception &e) 
    { 
     std::cout << "exeption: " << e.what() << std::endl; 
     threads.remove(thread_ptr); 
     std::cout << "thread removed from group" << std::endl; 
     return; 
    } 

} 

void pool_item(int i) 
{ 
    boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i)); 
    boost::unique_future<int> fi=pt.get_future(); 

    boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread 

    if(fi.timed_wait(boost::posix_time::milliseconds(time_limit))) 
    { 
     std::cout << "sucsess function returned: " << fi.get() << std::endl; 
    } 
    else 
    { 
     std::cout << "request took way 2 long!" << std::endl; 

     std::cout << "current group size:" << threads.size() << std::endl; 
     std::cout << "we want to add thread!" << std::endl; 
     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     threads.add(thread); 
     std::cout << "thread added" << std::endl 
      << "current group size:" << threads.size() << std::endl; 
     task->join(); 

     throw std::runtime_error("killed joined thread"); 

    } 
} 

int main() 
{ 
    time_limit = 500; 

    work = new boost::asio::io_service::work(io_service); 
    int cores_number = boost::thread::hardware_concurrency(); 
    for (std::size_t i = 0; i < cores_number; ++i) 
    { 

     boost::shared_ptr<boost::thread> thread; 
     boost::packaged_task<void> pt(boost::bind(run, thread)); 
     thread = boost::shared_ptr<boost::thread>(new boost::thread(std::move(pt))); 
     threads.add(thread); 
    } 

    int i = 800; 
    io_service.post(boost::bind(pool_item, i)); 

    boost::this_thread::sleep(boost::posix_time::milliseconds(i*2)); 
    std::cout << "thread should be removed by now." << std::endl 
     << "group size:" << threads.size() << std::endl; 

    std::cin.get(); 
    return 0; 
} 

답변

3

나는 이것을 달성 할 수 있었다. 콜백이 예외를 던지면 run()이 종료된다는 사실을 이용하여 과거에는 예외였습니다.

: 당신이해야 할 모든 예외가 발생합니다 콜백을 예약입니다 그리고

void RunIOService() 
{ 
    try 
    { 
     io_service.run(); 
    } 
    catch(std::exception ex) 
    { 
    } 
} 

: 대신 스레드에서 직접 run()를 시작, 나는 적절한 예외가 발생하면 스레드를 종료하는 유틸리티 함수를 호출

static void KillThreadCallback() 
{ 
    // throw some exception that you catch above 
} 

io_service.post(&KillThreadCallback); 

이 콜백을 실행하는 스레드가 종료되어 기본적으로 스레드 풀 개수를 1 줄입니다.이 방법을 사용하면 io_service 스레드 풀을 매우 쉽게 확장하고 축소 할 수 있습니다. 깨끗하게 (+ +0 람다 C를 사용)하는 I/O 서비스를 종료에 사용될 수

+0

내 게시물 업데이트를 참조하십시오. – Rella

1

한 패턴 : m_keepRunning는 멤버 변수 인

void ThreadLoop() 
{ 
    while(m_keepRunning) { 
     try { 
      io_service.run_one(); 
     } catch(const std::exception& e) { 
      // error handling 
     } 
    } 
} 

void Stop() 
{ 
    // Using C++0x lambdas 
    io_service.post([=]{ m_keepRunning = false; }); 
    // or 
    io_service.post(boost::bind(&ThisClass::StopCallback, this)); 
} 

void StopCallback() 
{ 
    m_keepRunning = false; 
} 

. I/O 서비스 스레드에서만 닿아 야합니다.

관련 문제