2013-06-17 3 views
0

기본 스레드에서 작업자 스레드 종료에 문제가 있습니다. 지금까지 시도한 각 방법 중 하나를 사용하면 경쟁 조건 또는 데드락이 발생합니다.작업자 스레드와 기본 스레드 종료 사이의 경쟁 조건

작업자 스레드는 ThreadPool이라는 클래스 내부의 내부 클래스에 저장되며 ThreadPool은 unique_ptr을 사용하여 이러한 WorkerThreads의 벡터를 유지 관리합니다. 여기

내 ThreadPool이의 헤더입니다 :
class ThreadPool 
{ 
public: 
typedef void (*pFunc)(const wpath&, const Args&, Global::mFile_t&, std::mutex&, std::mutex&);  // function to point to 
private: 

    class WorkerThread 
    { 
    private: 
     ThreadPool* const _thisPool;  // reference enclosing class 

     // pointers to arguments 
     wpath _pPath;    // member argument that will be modifyable to running thread 
     Args * _pArgs; 
     Global::mFile_t * _pMap; 

     // flags for thread management 
     bool _terminate;     // terminate thread 
     bool _busy;       // is thread busy? 
     bool _isRunning; 

     // thread management members 

     std::mutex    _threadMtx; 
     std::condition_variable _threadCond; 
     std::thread    _thisThread; 

     // exception ptr 
     std::exception_ptr _ex; 

     // private copy constructor 
     WorkerThread(const WorkerThread&): _thisPool(nullptr) {} 
    public: 
     WorkerThread(ThreadPool&, Args&, Global::mFile_t&); 
     ~WorkerThread(); 

     void setPath(const wpath);   // sets a new task 
     void terminate();     // calls terminate on thread 
     bool busy() const;     // returns whether thread is busy doing task 
     bool isRunning() const;    // returns whether thread is still running 
     void join();      // thread join wrapper 
     std::exception_ptr exception() const; 

     // actual worker thread running tasks 
     void thisWorkerThread(); 
    }; 

    // thread specific information 
    DWORD _numProcs;      // number of processors on system 
    unsigned _numThreads;     // number of viable threads 
    std::vector<std::unique_ptr<WorkerThread>> _vThreads; // stores thread pointers - workaround for no move constructor in WorkerThread 
    pFunc _task;       // the task threads will call 

    // synchronization members 
    unsigned _barrierLimit;     // limit before barrier goes down 
    std::mutex _barrierMtx;     // mutex for barrier 
    std::condition_variable _barrierCond; // condition for barrier 
    std::mutex _coutMtx; 

public: 
    // argument mutex 
    std::mutex matchesMap_mtx; 
    std::mutex coutMatch_mtx; 

    ThreadPool(pFunc f); 

    // wake a thread and pass it a new parameter to work on 
    void callThread(const wpath&); 

    // barrier synchronization 
    void synchronizeStartingThreads(); 

    // starts and synchronizes all threads in a sleep state 
    void startThreads(Args&, Global::mFile_t&); 

    // terminate threads 
    void terminateThreads(); 

private: 
}; 

지금까지 나는 데 진짜 문제는 그 메인 스레드에서 원인 죽은 잠금 또는 경쟁 조건을 terminateThreads()를 호출 할 때.

내 _terminate 플래그를 true로 설정하면 스레드가 깨우기 및 종료 할 수있는 기회를 갖기 전에 주 스레드가 이미 범위를 벗어나 모든 뮤텍스를 제거 할 수 있습니다. 사실,이 충돌은 꽤 많이 걸렸습니다. (콘솔 창이 표시됩니다 : 바쁜 동안에 뮤텍스가 깨졌습니다)

스레드를 notify_all() 종료 된 스레드에 조인하면 프로그램이 무기한 중지되므로 조인이 발생하기 전에 무한 데드락이 발생합니다.

나는 분리하는 경우 - 위와 같은 문제가 있지만, 프로그램 충돌이 발생

내가 대신 잠시를 사용하는 경우 (WorkerThread.isRunning()) 수면 (0); WorkerThread가 마지막 닫기 중괄호에 도달하기 전에 주 스레드가 종료 될 수 있으므로 프로그램이 중단 될 수 있습니다.

모든 작업자 스레드가 안전하게 종료 될 때까지 주체를 중지하기 위해 수행해야 할 작업이 무엇인지 확실하지 않습니다. 또한 스레드 및 메인에서 try-catch를 사용하더라도 예외가 발견되지 않습니다. (내가 시도한 모든 것이 프로그램 충돌로 이어짐)

작업자 스레드가 끝날 때까지 주 스레드를 중단하려면 어떻게해야합니까? 여기

은 주요 기능의 구현은 다음과 같습니다

하는 개별 작업자 스레드를 종료

void ThreadPool::WorkerThread::terminate() 
{ 
    _terminate = true; 
    _threadCond.notify_all(); 
    _thisThread.join(); 
} 

실제 ThreadLoop

void ThreadPool::WorkerThread::thisWorkerThread() 
{ 
    _thisPool->synchronizeStartingThreads(); 

    try 
    { 
     while (!_terminate) 
     { 
      { 
       _thisPool->_coutMtx.lock(); 
       std::cout << std::this_thread::get_id() << " Sleeping..." << std::endl; 
       _thisPool->_coutMtx.unlock(); 
       _busy = false; 
       std::unique_lock<std::mutex> lock(_threadMtx); 
       _threadCond.wait(lock); 
      } 
      _thisPool->_coutMtx.lock(); 
      std::cout << std::this_thread::get_id() << " Awake..." << std::endl; 
      _thisPool->_coutMtx.unlock(); 
      if(_terminate) 
       break; 

      _thisPool->_task(_pPath, *_pArgs, *_pMap, _thisPool->coutMatch_mtx, _thisPool->matchesMap_mtx); 

      _thisPool->_coutMtx.lock(); 
      std::cout << std::this_thread::get_id() << " Finished Task..." << std::endl; 
      _thisPool->_coutMtx.unlock(); 

     } 
     _thisPool->_coutMtx.lock(); 
     std::cout << std::this_thread::get_id() << " Terminating" << std::endl; 
     _thisPool->_coutMtx.unlock(); 
    } 
    catch (const std::exception&) 
    { 
     _ex = std::current_exception(); 
    } 
    _isRunning = false; 
} 

모든 작업자 스레드를 종료

void ThreadPool::terminateThreads() 
{ 
    for (std::vector<std::unique_ptr<WorkerThread>>::iterator it = _vThreads.begin(); it != _vThreads.end(); ++it) 
    { 
     it->get()->terminate(); 
     //it->get()->_thisThread.detach(); 

     // if thread threw an exception, rethrow it in main 
     if (it->get()->exception() != nullptr) 
      std::rethrow_exception(it->get()->exception()); 
    } 
} 

그리고 마지막으로, 스레드 풀을 호출하는 기능은 일리노이

// scans a path recursively for all files of selected extension type, calls thread to parse file 
unsigned int Functions::Scan(wpath path, const Args& args, ThreadPool& pool) 
{ 
    wrecursive_directory_iterator d(path), e; 
    unsigned int filesFound = 0; 
    while (d != e) 
    { 
     if (args.verbose()) 
      std::wcout << L"Grepping: " << d->path().string() << std::endl; 

     for (Args::ext_T::const_iterator it = args.extension().cbegin(); it != args.extension().cend(); ++it) 
     { 
      if (extension(d->path()) == *it) 
      { 
       ++filesFound; 
       pool.callThread(d->path()); 
      } 
     } 
     ++d; 
    } 

    std::cout << "Scan Function: Calling TerminateThreads() " << std::endl; 
    pool.terminateThreads(); 
    std::cout << "Scan Function: Called TerminateThreads() " << std::endl; 
    return filesFound; 
} 

다시 질문을 반복 (스캔 기능은 메인에서 실행) : 나는 작업자 스레드가 완료 될 때까지 메인 스레드를 중단하기 위해 무엇을 할 수 있는가?

+0

을 thisWorkerThread. std :: atomic을 사용해야합니다. 여기서 유일한 문제는 아니며 작업자 스레드를 차단하고 종료 요청을 관찰 할 수 없습니다. 일반적으로 C++ 11이 [std :: quick_exit()] (http://en.cppreference.com/w/cpp/utility/program/quick_exit)을 추가 한 이유는 해결하기가 어렵습니다. –

답변

0

문제였다 두 배 :.

synchronizeStartingThreads() 때로는 괜찮 앞서 갈의 동안 (some_condition) barrierCond.wait (잠금에서 (문제를 기다리고, 1 개 또는 2 개의 스레드가 차단 된 것)을 while 루프를 제거하면이 블로킹 문제가 해결되었습니다.

두 번째 문제점은 _threadMtx를 입력 할 가능성이 있고 notify_all이 _threadCond.wait()를 입력하기 직전에 호출 되었기 때문입니다. 알림이 이미 호출 되었기 때문에 스레드가 영원히 기다릴 것입니다.

ie. 놀랍게도이 뮤텍스를 terminate()에 잠그면 발생하지 않습니다.

는 _threadCond.wait()

는 또한 체크는 반드시 동일한 작업을 다시 처리되지 않았는지 태스크의 개시 이전에 첨가 된은 30ms의 타임 아웃을 부가함으로써 해결되었다.

새로운 코드는 이제 다음과 같습니다

당신은 스레드의 메인 루프에서 부울을 사용하고 true로 설정하는 것은 스레드에서 관찰 할 수 있기를 바랍니다 수 없습니다

_threadCond.wait_for(lock, std::chrono::milliseconds(30)); // hold the lock a max of 30ms 

// after the lock, and the termination check 

if(_busy) 
     { 
      Global::mFile_t rMap = _thisPool->_task(_pPath, *_pArgs, _thisPool->coutMatch_mtx); 
      _workerMap.element.insert(rMap.element.begin(), rMap.element.end()); 
     } 
1

스레드 종료 및 가입과 관련된 문제가 발생하지 않습니다.

스레드 가입은 주어진 스레드가 종료 될 때까지 기다리는 것과 관련되어 있습니다. 따라서 원하는 작업을 수행 할 수 있습니다. 스레드가 이미 실행을 마친 경우 join은 즉시 반환됩니다.

코드에서 이미 수행 한대로 terminate 호출 중 각 스레드에 참여하기 만하면됩니다.

참고 : 방금 ​​종료 한 스레드의 활성이 exception_ptr 인 경우 현재 모든 예외가 즉시 재발행됩니다. 이로 인해 연결되지 않은 스레드가 생길 수 있습니다. 당신은 그 예외

업데이트 처리 할 때 명심해야 할 것이다 : 코드보고 후을, 나는 잠재적 인 버그를 참조하십시오 가짜 웨이크가 발생했을 때 std::condition_variable::wait()가 반환 할 수 있습니다. 이 경우 마지막으로 작업 한 경로에서 다시 작업하여 잘못된 결과를 초래할 수 있습니다. 새로운 작업이 추가 된 경우 설정된 새 작업에 대한 플래그가 있어야하고 _threadCond.wait(lock) 행은 플래그를 확인하는 루프에 있어야하고 _terminate이어야합니다. 하지만 그 중 하나가 문제를 해결할 지 확신하지 못합니다.

+0

위의 코드는 20 %의 시간에만 작동합니다. 프로그램에 참여할 때마다 프로그램이 무기한 중지됩니다. 조인을 제거하면 죽은 잠금이 발생하지 않지만 스레드가 수행하기 전에 주 종료됩니다. join은 어떻게 든 교착 상태를 일으킨다. – Igneous01

+0

두 개 이상의 스레드 사이에 dealock이있을 수 있습니까? 변수를 공유합니까? 이 경우'join' 호출을 제거하면 주 스레드가 종료 될 때 하위 스레드를 강제 종료하여 교착 상태를 숨길 수 있습니다. –

+0

예. 모든 작업자 스레드는 1 개의 주요 변수 - 맵 맵을 공유합니다. 지도에서 요소 삽입을 잠그기 위해 함수 포인터를 통해 전달할 뮤텍스가 있습니다. 그러나지도가 아직 가끔씩 잘못된 데이터를 반환하고 있기 때문에 지금은 아무 것도하지 않는 것으로 보입니다. 게다가 포인트 -> 프로그램은이 시점을 지나서 이미 맵이 채워져 있고이 시간까지 이미 스캔 된 파일입니다. 그것은 terminateThreads()를 호출 할 때만 무한정 정지됩니다.그래서 나는이 시간에 모두 자고있을 때 스레드에서 교착 상태를 보지 못합니다. – Igneous01

관련 문제