2013-05-15 2 views
2

스레드 ONE이 데이터를 처리하는 4-5 워커 스레드와 모든 워커 스레드가 완료된 곳에서 멀티 스레딩을하고 싶습니다. 계속 진행하겠습니다. 나는 그러나 내가 동기화 문제가 있다는 것을 깨닫기 위해 부스트를 사용하고 있습니다. 하나의 의미에서 프로그램은 멈추고 계속 작동하지 않습니다. 부스트 C++을 이용한 멀티 스레딩 - 동기화 문제

나는 전에 OpenMP를 사용하고는 잘 작동하지만 개별적으로 스레드 우선 순위를 설정하려는 내가 OpenMP를 함께 그러므로 나는 내 자신의 솔루션에 근무 그렇게하는 방법을 알아낼 수 없습니다 :

나는 매우 것 일부는이 코드에서 버그를 찾기위한 힌트를 줄 수 있거나 문제에 대한 다른 접근법을 찾는데 도움이 될 수 있다면 기쁩니다.

, 감사합니다 KmgL 그것은 Boost futures 함께 할 수

#include <QCoreApplication> 

#include <boost/thread.hpp> 

#define N_CORE 6 
#define N_POINTS 10 
#define N_RUNS 100000 

class Sema{ 

public: 
    Sema(int _n =0): m_count(_n),m_mut(),m_cond(){} 

    void set(int _n) 
    { 
     boost::unique_lock<boost::mutex> w_lock(m_mut); 
     m_count = -_n; 
    } 

    void wait() 
    { 
     boost::unique_lock<boost::mutex> lock(m_mut); 
     while (m_count < 0) 
     { 
      m_cond.wait(lock); 
     } 
     --m_count; 
    } 
    void post() 
    { 
     boost::unique_lock<boost::mutex> lock(m_mut); 
     ++m_count; 
     m_cond.notify_all(); 
    } 


private: 
    boost::condition_variable m_cond; 
    boost::mutex m_mut; 
    int m_count; 

}; 

class Pool 
{ 
private: 
    boost::thread m_WorkerThread; 
    boost::condition_variable m_startWork; 
    bool m_WorkerRun; 
    bool m_InnerRun; 
    Sema * m_sem; 

    std::vector<int> *m_Ep; 
    std::vector<int> m_ret; 

    void calc() 
    { 
     unsigned int no_pt(m_Ep->size());     
     std::vector<int> c_ret; 
     for(unsigned int i=0;i<no_pt;i++) 
      c_ret.push_back(100 + m_Ep->at(i)); 

     m_ret = c_ret; 
    } 
    void run() 
    { 
     boost::mutex WaitWorker_MUTEX; 
     while(m_WorkerRun) 
     { 
      boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX); 
      m_startWork.wait(u_lock); 
      calc(); 
      m_sem->post(); 
     } 

    } 

public: 
    Pool():m_WorkerRun(false),m_InnerRun(false){} 
    ~Pool(){} 
    void start(Sema * _sem){ 
     m_WorkerRun = true; 
     m_sem = _sem; 
     m_ret.clear(); 
     m_WorkerThread = boost::thread(&Pool::run, this);} 
    void stop(){m_WorkerRun = false;} 
    void join(){m_WorkerThread.join();} 

    void newWork(std::vector<int> &Ep) 
    { 
     m_Ep = &Ep; 
     m_startWork.notify_all(); 
    } 
    std::vector<int> getWork(){return m_ret;} 



}; 

int main(int argc, char *argv[]) 
{ 
    QCoreApplication a(argc, argv); 

    Pool TP[N_CORE]; 

    Sema _sem(0); 
    for(int k=0;k<N_CORE;k++) 
     TP[k].start(&_sem); 


    boost::this_thread::sleep(boost::posix_time::milliseconds(10)); 

    std::vector<int> V[N_CORE]; 

    for(int k=0;k<N_CORE;k++) 
     for(int i=0;i<N_POINTS;i++) 
     { 
      V[k].push_back((k+1)*1000+i); 
     } 

    for(int j=0;j<N_RUNS;j++) 
    { 
     _sem.set(N_CORE); 
     for(int k=0;k<N_CORE;k++) 
     { 
      TP[k].newWork(V[k]); 
     } 

     _sem.wait(); 

     for(int k=0;k<N_CORE;k++) 
     { 
      V[k].clear(); 
      V[k]=TP[k].getWork(); 
      if(V[k].size()!=N_POINTS) 
       std::cout<<"ERROR: "<<"V["<<k<<"].size(): "<<V[k].size()<<std::endl; 

     } 
     if((j+1)%100==0) 
      std::cout<<"LOOP: "<<j+1<<std::endl; 
    } 
    std::cout<<"FINISHED: "<<std::endl; 

    return a.exec(); 
} 

답변

0

Pool::newWork()Pool::run() 사이의 통화가 있습니다.

조건 변수의 신호/브로드 캐스팅이 고정 이벤트가 아니라는 것을 기억해야합니다. 시그널링 당시 스레드가 조건 변수를 기다리지 않으면 신호가 손실됩니다. 이것은 프로그램에서 일어날 수있는 일입니다. 주 변수가 wait()을 조건 변수로 호출하기 전에 각 풀 객체의 Pool::newWork()에 전화하는 것을 차단하는 것은 없습니다.

이 문제를 해결하려면 boost::mutex WaitWorker_MUTEX을 로컬 변수가 아닌 클래스 멤버로 이동해야합니다. Pool::newWork()는 일을 업데이트하기 전에 뮤텍스를 잡기 위해 필요 :

boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX); 
m_Ep = &Ep; 
m_startWork.notify(); // no need to use notify_all() 

당신이 Pool::run()에서 조건 변수를 사용하고 있기 때문에, 당신은 가짜 웨이크 업을 처리해야합니다.

boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX); 
while (1) { 
    while (m_Ep == NULL && m_workerRun) { 
     m_startWork.wait(u_lock); 
    } 
    if (!m_workerRun) { 
     return; 
    } 
    calc(); 
    m_sem->post(); 
    m_Ep = NULL; 
} 

정지() (뮤텍스를 잡아 통지해야합니다) : 당신은 객체와 당신이 작업 항목과 완료 할 때마다 만들 때 NULL로 m_Ep를 설정하는 것이 좋습니다

boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX); 
m_workRun = false; 
m_startWork.notify(); 

이러한 변경 사항으로 인해 불필요한 10ms의 시간을 절약 할 수 있습니다. Pool::stop() 또는 Pool::join()으로 전화하지 않는 것 같습니다. 코드를 변경하여 코드를 호출해야합니다.

m_retPool::calc()에서 처리하면 성능이 향상됩니다. 작업을 반환 할 때도 복사본을 만들고 있습니다. Pool::getWork()이 const 참조를 m_ret으로 되 돌리는 것이 좋습니다.

다른 문제가있을 수 있으므로이 코드를 실행하지 않았습니다. 그것은 당신을 도울 것입니다

당신의 코드에서 여러분은 왜 조건 변수가 뮤텍스와 함께 있어야하는지 궁금해 할 것 같습니다. 왜냐하면 여러분은 Pool::run()에 하나의 로컬 뮤텍스를 선언했기 때문입니다. 내 수정으로 더 명확 해지기를 바랍니다.

+0

대단히 고마워요. 나는 그 경쟁 조건을 완전히 놓쳤습니다. 그리고 다른 제안들에 대해서도 많이 감사드립니다. 그들은 모두 매우 도움이되었습니다. – KmgL

0

. 스레드를 시작한 다음 wait for all 개의 스레드를 완료하십시오. 다른 동기화는 필요하지 않습니다.