2013-10-29 4 views
2

시간 초과 가능 offer()과 함께 C++의 블로킹 큐가 필요했습니다. 대기열은 인 복수의 생산자 을 대상으로합니다. 이전에 구현할 때이 필요에 맞는 기존의 대기열을 찾지 못해서 직접 코딩했습니다.C++ 블로킹 큐 Segfault 승격

대기열의 take() 메소드에서 나오는 segfaults가 표시되지만 간헐적 인 것입니다. 문제에 대한 코드를 살펴 봤지만 문제가있는 것으로 보이지 않습니다.

  • 안정적으로이 작업을 수행 기존 라이브러리가 있다는 것을 내가해야 사용 (부스트 또는 헤더 전용 권장) :

    나는 경우 궁금하네요.

  • 누구나 수정해야 할 코드에 명백한 결함이있는 것 같습니다.

    class BlockingQueue 
    { 
        public: 
         BlockingQueue(unsigned int capacity) : capacity(capacity) { }; 
         bool offer(const MyType & myType, unsigned int timeoutMillis); 
         MyType take(); 
         void put(const MyType & myType); 
         unsigned int getCapacity(); 
         unsigned int getCount(); 
    
        private: 
         std::deque<MyType> queue; 
         unsigned int capacity; 
    }; 
    

    와 관련 구현 : 여기에

헤더입니다

boost::condition_variable cond; 
boost::mutex mut; 

bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis) 
{ 
    Timer timer; 

    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    // We use a while loop here because the monitor may have woken up because 
    // another producer did a PulseAll. In that case, the queue may not have 
    // room, so we need to re-check and re-wait if that is the case. 
    // We use an external stopwatch to stop the madness if we have taken too long. 
    while (queue.size() >= this->capacity) 
    { 
     int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds()); 

     if (monitorTimeout <= 0) 
     { 
      return false; 
     } 

     if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis))) 
     { 
      return false; 
     } 
    } 

    cond.notify_all(); 

    queue.push_back(myType); 

    return true; 
} 

void BlockingQueue::put(const MyType & myType) 
{ 
    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    // We use a while loop here because the monitor may have woken up because 
    // another producer did a PulseAll. In that case, the queue may not have 
    // room, so we need to re-check and re-wait if that is the case. 
    // We use an external stopwatch to stop the madness if we have taken too long. 
    while (queue.size() >= this->capacity) 
    { 
     cond.wait(lock); 
    } 

    cond.notify_all(); 

    queue.push_back(myType); 
} 

MyType BlockingQueue::take() 
{ 
    // boost::unique_lock is a scoped lock - its destructor will call unlock(). 
    // So no need for us to make that call here. 
    boost::unique_lock<boost::mutex> lock(mut); 

    while (queue.size() == 0) 
    { 
     cond.wait(lock); 
    } 

    cond.notify_one(); 

    MyType myType = this->queue.front(); 

    this->queue.pop_front(); 

    return myType; 
} 

unsigned int BlockingQueue::getCapacity() 
{ 
    return this->capacity; 
} 

unsigned int BlockingQueue::getCount() 
{ 
    return this->queue.size(); 
} 

그리고 그래, 나는 템플릿을 사용하여 클래스를 구현하지 않았다 - 그 목록에 다음이다 :)

도움을 주시면 대단히 감사하겠습니다. 스레딩 문제는 핀 고정하기가 정말 어려울 수 있습니다.

-ben

+0

*이 수업을 어떻게 사용합니까? 특히 예를 들어 '가져 가라. ' 이 동작을 나타내는 [간단한 컴파일 가능한 예제] (http://sscce.org/)를 만들어보십시오. –

+0

"MyType"은 어떻게 복사됩니까? 사소한 포드 구조체입니까? –

+0

정확히 어떤 라인에서 던지나요? –

답변

0

왜 cond와 mut globals가 있습니까? 나는 그것들이 당신의 BlockingQueue 객체의 멤버가 될 것이라고 기대한다. 나는 다른 것들을 만지는 것이 무엇인지 모르지만 거기에 문제가있을 수 있습니다.

내가 너무 큰 프로젝트의 일환으로 ThreadSafeQueue을 구현 한 : 기본적으로이 있기 때문에

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

그것은 대기열을 제외하고, 당신 유사한 개념이 (일명 이벤트) 함수는 비 차단이다 최대 용량 없음. 용량을 강화하기 위해 일반적으로 시스템 초기화시 N 버퍼가 추가 된 풀과 런타임에 메시지 전달을위한 큐가 있으며 런타임시 메모리 할당이 필요하지 않습니다. 임베디드 애플리케이션에서 작동).

풀과 대기열의 유일한 차이점은 풀이 시스템 초기화시 대기열에 묶인 여러 개의 버퍼를 확보한다는 것입니다. 당신은 당신이 다음과 같은 일을 할 메시지를 전송하고자 할 때 다음

ThreadSafeQueue<BufferDataType*> pool; 
ThreadSafeQueue<BufferDataType*> queue; 

void init() 
{ 
    for (int i = 0; i < NUM_BUFS; i++) 
    { 
     pool.enqueue(new BufferDataType); 
    } 
} 

: 그래서 당신은 이런 식으로 뭔가를

void producerA() 
{ 
    BufferDataType *buf; 
    if (pool.waitDequeue(buf, timeout) == true) 
    { 
     initBufWithMyData(buf); 
     queue.enqueue(buf); 
    } 
} 

인큐 기능을 빠르고 쉽게,하지만이 방법 풀 인 경우 비어 있으면 다른 사람이 버퍼를 다시 풀에 넣을 때까지 차단됩니다.

void consumer() 
{ 
    BufferDataType *buf; 
    if (queue.waitDequeue(buf, timeout) == true) 
    { 
     processBufferData(buf); 
     pool.enqueue(buf); 
    } 
} 

는 어쨌든 어쩌면 도움이 될 것입니다, 그것을 살펴 : 다른 스레드가 큐에 차단됩니다 그들이 처리 된 경우 다음과 같이 풀에 버퍼를 돌려 보낼 것을 생각.

0

코드의 문제로 인해 여러 스레드가 deque를 수정한다고 가정합니다. 모양 :

  1. 다른 스레드의 코드 작성을 기다리고 있습니다.
  2. 그런 다음 deque가 수정되기 바로 전에 잠금 해제 된 다른 스레드에 즉시 신호를 보냅니다.
  3. 다른 스레드가 deque가 이미 잠금 해제되어 있고 동일한 작업을 시작하는 동안 deque를 수정합니다.

따라서 큐를 수정 한 후에 cond.notify_*()을 모두 배치 해보십시오. 즉이 :

void BlockingQueue::put(const MyType & myType) 
{ 
    boost::unique_lock<boost::mutex> lock(mut); 
    while (queue.size() >= this->capacity) 
    { 
     cond.wait(lock); 
    } 

    queue.push_back(myType); // <- modify first 

    cond.notify_all();  // <- then say to others that deque is free 
} 

더 나은 이해를 위해 나는 pthread_cond_wait()에 대해 읽어 것이 좋습니다.