2011-01-13 4 views
3

1. 소켓 연결을 열고 데이터를 읽음 2) 개행 문자로 데이터 분할 3) 데이터 세그먼트를 대기열로 보냅니다. 별도의 스레드에서 처리됩니다.많은 양의 텍스트 구문 분석

인증 및 DNS 조회를 처리 할 때 curlpp 라이브러리를 사용하고 있습니다. 대기열은 스레드 안전을위한 뮤텍스가있는 양단 큐일뿐입니다.

이것은 현재 사용중인 방법입니다.

std::string input; 
size_t socketIO::dataCallBack(char* ptr, size_t size, size_t nmemb) { 
    // Calculate the real size of the incoming buffer 
    size_t realsize = size * nmemb; 

    //Append the new input to the old input 
    input.append(ptr, realsize); 

    //Find all the complete strings and push them to the queue 
    size_t oldPosition = 0; 
    size_t position = 0; 
    position = input.find('\r', oldPosition); 
    while (position != std::string::npos) { 
     queueObject.push(input.substr(oldPosition, position)) 
     oldPosition = position + 1; 
     position = input.find('\r', oldPosition); 
    } 

    //Save off the partial string as you'll get the rest of it on the next data callback 
    input = input.substr(oldPosition); 

    return realsize; 
} 

나는 몇 가지 문제점이 있습니다. 메모리 누수 문제가 발생하여 valgrind가이 함수에서 큰 누출을 보이고 있습니다.

==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359 
==12867== at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261) 
==12867== by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13) 
==12867== by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13) 
==12867== by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13) 
==12867== by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006) 
==12867== by 0x509C657: utilspp::Functor<unsigned long, utilspp::tl::TypeList<char*, utilspp::tl::TypeList<unsigned long, utilspp::tl::TypeList<unsigned long, utilspp::NullType> > > >::operator()(char*, unsigned long, unsigned long) (Functor.hpp:106) 
==12867== by 0x509B6E4: curlpp::internal::CurlHandle::executeWriteFunctor(char*, unsigned long, unsigned long) (CurlHandle.cpp:171) 
==12867== by 0x509F509: curlpp::internal::Callbacks::WriteCallback(char*, unsigned long, unsigned long, curlpp::internal::CurlHandle*) (OptionSetter.cpp:47) 
==12867== by 0x4E3D667: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1) 
==12867== by 0x4E5407B: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1) 
==12867== by 0x4E505A1: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1) 
==12867== by 0x4E51A8F: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1) 
==12867== by 0x509A78B: curlpp::internal::CurlHandle::perform() (CurlHandle.cpp:52) 
==12867== by 0x5093A6B: curlpp::Easy::perform() (Easy.cpp:48) 
==12867== by 0x41EDC3: socketIO::processLoop() (socketIO.cpp:126) 

무엇을 하시겠습니까? 나는 istringstream을 사용하는 것을 고려해 봤지만, 메모리 할당이 어떻게 작동하는지 그리고 내가 이미 읽은 메모리를 되 찾을 것인지 잘 모르겠습니다. 콜백간에 데이터를 유지해야하지만 메모리가 누출되지 않는 방식으로 데이터를 처리해야한다는 점에서 문제가 있습니다.

업데이트 요청하신 코드가 더 있습니다. 나는 더 많은 것이 더 좋다는 생각으로 올렸다.

MAIN.CPP

/** 
* The main driver for the twitter capture app. Starts multiple threads for processors, 1 io thread and 2 db threads. One for user 
* information and the other for tweet information 
*/ 

#include "types.h" 
#include "threadBase.h" 
#include "socketIO.h" 
#include "processor.h" 
#include "dbTweetQueue.h" 
#include "dbUserQueue.h" 

#include <vector> 


stringQueue twitToProc; 
tweetQueue tweetQ; 
userQueue userQ; 
deleteQueue deleteQ; 
std::vector<ThreadBase *> threadGroup; 

std::string dbBase::dbUser(DBUSER); 
std::string dbBase::dbURL(DBURL); 
std::string dbBase::dbPass(DBPASS); 

/* 
* Handle the signal for interupt 
*/ 
void sigquit(int param) 
{ 
    std::cout<<"Received sigquit"<<std::endl; 
    for(unsigned int i = 0; i < threadGroup.size(); i++) 
    { 
     threadGroup[i]->interupt(); 
    } 
} 


int main(int argc, char* argv[]) 
{ 
    try{ 
    //Setting the signal handler up. 
    struct sigaction act; 
    act.sa_handler = sigquit; 
    sigemptyset(&act.sa_mask); 
    act.sa_flags = 0; 
    sigaction(SIGQUIT, &act, 0); 


    int MaxThreads = 5; 
    if(argc < 3) 
    { 
     std::cout<<"Usage: >"<<argv[0]<<" TwitterUserName TwitterPassWord"<<std::endl; 
     std::cout<<"Using Defaults: "<<TWITTERACCT<<" "<<TWITTERPASS<<std::endl; 
    } 

    // Create socketIO, and add it to the thread group 
    if(argc == 3) 
    { 
     threadGroup.push_back(new socketIO(twitToProc, argv[1], argv[2])); 
    } 
    else 
    { 
     threadGroup.push_back(new socketIO(twitToProc)); 
    } 


    // Create processorThreads and add them to the thread group 
    for(int i = 0; i < MaxThreads; i++) 
    { 
     threadGroup.push_back(new processor(twitToProc, tweetQ, deleteQ, userQ)); 
    } 

    //Create DB Threads and add them to the thread group. 
    threadGroup.push_back(new dbTweetQueue(tweetQ, deleteQ)); 
    threadGroup.push_back(new dbUserQueue(userQ)); 


    // Start the threads 
    for(unsigned int i = 0; i < threadGroup.size(); i++) 
    { 
     threadGroup[i]->start(); 
    } 

    // Join the threads 
    for(unsigned int i = 0; i < threadGroup.size(); i++) 
    { 
     threadGroup[i]->join(); 
    } 

      } catch (std::exception & e) { 
      std::cerr << e.what() << std::endl; 
     } 

    for(unsigned int i = 0; i < threadGroup.size(); i++) 
    { 
     threadGroup[i]->(); 
    } 
    return 0; 
} 

threadBase.h

#ifndef _THREADBASE_H 
#define _THREADBASE_H 

#include <boost/thread.hpp> 

class ThreadBase 
{ 
public: 
    virtual void join() = 0; 
    virtual void start() = 0; 
    void interupt(){thread.interrupt();} 
protected: 
    boost::thread thread; 

}; 



#endif /* _THREADBASE_H */ 

socketIO.h

#ifndef _SOCKETIO_H 
#define _SOCKETIO_H 

#include "types.h" 
#include "threadBase.h" 

#include <boost/bind.hpp> 
#include <curlpp/cURLpp.hpp> 
#include <curlpp/Multi.hpp> 
#include <curlpp/Easy.hpp> 
#include <curlpp/Options.hpp> 
#include <curlpp/Exception.hpp> 
#include <curlpp/Infos.hpp> 
#include <curl/curl.h> 

#include <signal.h> 
#include <string> 
#include <sstream> 
#include <cstdlib> 


#define defaultRepeatInterval 10; 

class socketIO: public ThreadBase { 
private: 
    int repeatInterval; 
    double previousDownloadSize; 
    int failCount; 
    int writeRound; 
    std::string userPassword; 
    stringQueue& queueObject; 
    std::string input; 


public: 
    socketIO(stringQueue & messageQueue): 
       queueObject(messageQueue) 
    { 
     userPassword.append(TWITTERACCT); 
     userPassword.append(":"); 
     userPassword.append(TWITTERPASS); 
    } 

    socketIO(stringQueue & messageQueue, char* userName, char* password): 
       queueObject(messageQueue) 
    { 
     userPassword.append(userName); 
     userPassword.append(":"); 
     userPassword.append(password); 
    } 

    virtual ~socketIO(); 

    void join(); 
    void start(); 
    std::auto_ptr<curlpp::Easy> createRequest(int); 



    void processLoop(); 
    size_t write(char* ptr, size_t size, size_t nmemb); 
    int progress(double, double, double, double); 

}; 

#endif /* _SOCKETIO_H */ 

socketIO.cpp

#include "socketIO.h" 

socketIO::~socketIO() { 
} 

/* 
* This method starts a new thread with the processLoop method 
*/ 
void socketIO::start() { 
    thread = boost::thread(&socketIO::processLoop, this); 
} 

/* 
* This method blocks waiting for the thread to exit 
*/ 
void socketIO::join() { 
    thread.join(); 
} 

/* 
* The datacall back function for the open twitter connection.\ 
*/ 
size_t socketIO::write(char* ptr, size_t size, size_t nmemb) { 
    // Calculate the real size of the incoming buffer 
    size_t realsize = size * nmemb; 
    std::string temp; 
    temp.append(input); 
    temp.append(ptr, realsize); 
    size_t oldPosition = 0; 
    size_t position = 0; 
    position = temp.find('\r', oldPosition); 
    while (position != std::string::npos) { 
     queueObject.push(temp.substr(oldPosition, position)); 
     ++writeRound; 
     oldPosition = position + 1; 
     position = temp.find('\r', oldPosition); 
    } 
    input = temp.substr(oldPosition); 
    return realsize; 
} 

/* 
* The timed callback function, called every second, used to monitor that the connection is still receiving data 
* Return 1 if requesting break or data flow stops, 0 if continuing normally 
*/ 
int socketIO::progress(double dltotal, double dlnow, double ultotal, double ulnow) { 
    // Allows us to break out on interruption 
    if (boost::this_thread::interruption_requested()) 
     return 1; 

    if (dlnow == previousDownloadSize) { 
     if (failCount < 15) 
      failCount++; 
     else { 
      repeatInterval = repeatInterval * 2; 
      return 1; 
     } 
    } else { 
     repeatInterval = 10; 
     previousDownloadSize = dlnow; 
    } 
    return 0; 
} 

/* 
* This method creates a new connection to the twitter service with the required settings 
*/ 
std::auto_ptr<curlpp::Easy> socketIO::createRequest(int source) { 
    //Reset the input buffer when the connection is made. 
    input = std::string(""); 
    std::auto_ptr<curlpp::Easy> newRequest(new curlpp::Easy); 

    curlpp::types::ProgressFunctionFunctor progressFunctor(this, &socketIO::progress); 
    newRequest->setOpt(new curlpp::options::ProgressFunction(progressFunctor)); 

    curlpp::types::WriteFunctionFunctor functor(this, &socketIO::write); 
    newRequest->setOpt(new curlpp::options::WriteFunction(functor)); 

    newRequest->setOpt(new curlpp::options::FailOnError(true)); 
    newRequest->setOpt(new curlpp::options::NoProgress(0)); 
    newRequest->setOpt(new curlpp::options::Verbose(true)); 
    newRequest->setOpt(new curlpp::options::UserPwd(userPassword)); 


    //Code for debugging and using alternate sources 
    std::string params = "track=basketball,football,baseball,footy,soccer"; 

    switch (source) { 
     case 1: // Testing Locally 
      newRequest->setOpt(new curlpp::options::Url("127.0.0.1:17000")); 
      break; 
     case 2: // Filtered 
      newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/filter.json")); 
      newRequest->setOpt(new curlpp::options::PostFields(params)); 
      newRequest->setOpt(new curlpp::options::PostFieldSize(params.size())); 
      break; 
     case 3: //Twitter Main Stream 
      newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/sample.json")); 
      break; 
    } 

    return newRequest; 
} 


/* 
* The main method of the thread. Creates a new instance of the request 
*/ 
void socketIO::processLoop() { 
    repeatInterval = defaultRepeatInterval; 
    std::auto_ptr<curlpp::Easy> request; 
    while (true) { 
     try { 
      previousDownloadSize = 0; 
      failCount = 0; 
      request.reset(createRequest(3)); 
      request->perform(); 
     } catch (curlpp::UnknowException & e) { 
      std::cout << "Unknown Exception: " << e.what() << std::endl; 
     } catch (curlpp::RuntimeError & e) { 
      std::cout << "Runtime Exception: " << e.what() << std::endl; 
     } catch (curlpp::LogicError & e) { 
      std::cout << "Logic Exception: " << e.what() << std::endl; 
     } 


     if (boost::this_thread::interruption_requested()) 
      break; 
     else 
      boost::this_thread::sleep(boost::posix_time::seconds(repeatInterval)); 
    } 
} 
,

난 정말이 응답의 확신 할 수있는 충분한 정보를 가지고 있지만, 여기 내 추측하지 않는 것이

#ifndef _CONCURRENT_QUEUE_ 
#define _CONCURRENT_QUEUE_ 

#include <boost/thread/mutex.hpp> 
#include <boost/thread/condition_variable.hpp> 
#include <deque> 

template<typename Data> 
class concurrent_queue 
{ 
private: 
    std::deque<Data> the_queue; 
    mutable boost::mutex the_mutex; 
    boost::condition_variable the_condition_variable; 
public: 
    void push(Data const& data) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     the_queue.push_back(data); 
     lock.unlock(); 
     the_condition_variable.notify_one(); 
    } 

    bool empty() const 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     return the_queue.empty(); 
    } 

    bool try_pop(Data& popped_value) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     if(the_queue.empty()) 
     { 
      return false; 
     } 

     popped_value=the_queue.front(); 
     the_queue.pop_front(); 
     return true; 
    } 

    void wait_and_pop(Data& popped_value) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     while(the_queue.empty()) 
     { 
      the_condition_variable.wait(lock); 
     } 

     popped_value=the_queue.front(); 
     the_queue.pop_front(); 
    } 

}; 

#endif /* _CONCURRENT_QUEUE_ */ 
+0

valvgrind 백 트레이스가 전부입니까? 나는 당신의 코드를 보지 못했지만 아마 그것을 놓쳤을 것입니다. 방금 libcurl과 socketIO 항목이 있습니다. 더 이상 함수에'new' 호출이 없으므로'append' 만 호출하면됩니다. 사용하고있는 STL 구현이이 기능을 제대로 사용하기를 바랍니다. – EnabrenTane

+0

socketIO가 파괴 되었습니까? 나는 "std :: string input"이 socketIO의 멤버라고 가정하고있다. – Mic

+0

\ r은 캐리지 리턴이며 네트워크 프로토콜 (오늘)은 일반적으로 \ r \ n 또는 \ n을 사용합니다.이것은 누출을 일으키지 않는 것처럼 보이지만, 아마도 그것은 공헌하는 문제입니까? 당신이 따르는 프로토콜이 라인을 종료하기 위해 하나의 \ r을 사용한다고 확신하십니까? –

답변

3

concurrent_queue.hpp

#ifndef _TYPES_H 
#define _TYPES_H 

#include <string> 
#include <concurrent_queue.hpp> 

#define DBUSER "****" 
#define DBPASS "****" 
#define DBURL "****" 
#define TWITTERACCT "****" 
#define TWITTERPASS "****" 

typedef struct tweet { 
... 
} tweet; 

typedef struct user { 
... 
} user; 


typedef concurrent_queue<std::string> stringQueue; 
typedef std::pair<int, std::string> dbPair; 
typedef concurrent_queue<dbPair> dbQueue; 

typedef concurrent_queue<tweet> tweetQueue; 
typedef concurrent_queue<user> userQueue; 
typedef concurrent_queue<boost::int64_t> deleteQueue; 

#endif /* _TYPES_H */ 

types.h. 메모리가 할당 된 경우에 대한 Valgrind의 스택을보고

, 당신은 참조 : 꽤 많은 단지 문자열을 의미

==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359 
==12867== at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261) 
==12867== by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13) 
==12867== by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13) 
==12867== by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13) 
==12867== by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006) 

은 당신의 쓰기 방식으로 만들어졌다. std :: string은 대부분의 stl 컨테이너와 마찬가지로 필요할 때까지 힙에 아무 것도 할당하지 않습니다.이 경우에는 데이터를 추가 할 때입니다.

이제는 메모리가 할당되었지만 괜찮습니다. 그러나 std :: string 입력의 소멸자가 절대 호출되지 않기 때문에 절대 할당이 해제되지 않습니다. 이에 대한 몇 가지 이유가있을 수 있지만 가장 일반적인 이유는 다음과 같습니다.

  • 힙이 할당되어 socketIO를 할당하는 것을 잊었습니다.
  • 가상 함수가 있지만 어딘가에 가상 소멸자를 잊어 버렸습니다.
+0

이 문제를 정의하는 방법을 모르겠습니다. 나는 그것이 메모리 누수인지 또는 가출 메모리인지 확실하지 않지만 어느 것이 든 문제 다. 이 프로그램은 매우 오랜 기간 동안 데이터 캡처를 실행하기위한 것이므로 파괴되지 않습니다. –

1

이 코드는 다른 곳에서나이 코드와 함께 또는 다른 곳에서 나타납니다.

게시 한 코드에 메모리 누수가 없습니다. 중요한 세부 사항을 빠뜨린 SO에 게시 할 코드를 변형했을 가능성이 있습니다. 예를 들어, 큐를 잠그지 않은 상태에서 (실제로는 필요하다고 말하면서) 큐를 잠그지 않았기 때문에 손상과 유출로 이어질 수 있습니다. 또 다른 예는 입력 변수입니다. 실제로 글로벌입니까 아니면 데이터 멤버입니까? Mic mentions 전사 오류 일 수도 있고 아닐 수도있는 몇 가지 잠재적 오류가 있습니다.

우리는 실제로 문제를 보여주는 완전한 컴파일 가능한 예제가 필요합니다.

+0

대기열 구현을 포함하여이 프로그램의 대부분의 코드를 게시했습니다. 입력은 거의 전체 프로그램의 수명을 가진 클래스의 데이터 멤버입니다. –

2

ThreadBase에는 가상 소멸자가 없습니다.

객체가 가리키는마다 ThreadBase*delete을 적용한 결과는 ThreadBase 아니지만 파생 형 따라서 정의이다. 실제로 파생 클래스 중 하나가 메모리를 (직접 또는 간접적으로) 할당하면 누수가되는 경우가 많습니다. 보기의 디자인 관점에서

class ThreadBase 
{ 
public: 
    virtual ~ThreadBase() {} // <-- There you go! 

    virtual void join() = 0; 
    virtual void start() = 0; 

    void interupt() { thread.interrupt(); } 

protected: 
    boost::thread thread; 
}; 

:

  • 을 피 protected 속성은, 사용을 캡슐화하는 방법을 제공하는 것을 선호합니다.
  • NVI (가상 인터페이스가 아닌) 관용구는 publicvirtual 인 메소드를 사용하는 것은 나쁜 생각 (예를 들어 전제 조건과 사후 조건을 확인할 수 없음)을 명시하고 있으며 비공개 메소드를 호출하는 비회원 공개 메소드를 사용하는 것이 좋습니다 구현 세부 사항에 대한 가상 메소드.
  • ThreadBaseboost::noncopyable에서 개인적으로 상속되어 복사 할 수 없다고 기록 할 수 있습니다.
2

은 내가하지만 경우 다른 사람 (더 사실 몇 개월) 여기 오히려 후반이 스레드를 따르고 있습니다에 무게입니다 알고, 나는 비슷한 문제를 가지고 나는 curlpp에 그것을 아래로 추적 관리했습니다 라이브러리.

내가 말한 것은 C++ 전문가가 아니기 때문에 내가 라이브러리를 사용하는 방식이라고 확신한다. 그들은 RAII 스타일의 메모리 정리를 사용하지만 내 요청 (프로그램 실행 중에 다시 사용됨) 내에서 명시 적으로 옵션 설정을 만들고 파괴하는 경우조차도 프로세스의 메모리 사용량이 계속 증가하는 것으로 나타났습니다.

curlpp 라이브러리에 대한 호출을 제거하면 프로그램은 메모리 요구 사항까지 매우 정적으로 실행됩니다. 그들이 제공하는 예제의 대부분은 main()을 사용하여 무언가를 끝내고 종료하는 간단한 프로그램이므로 인스턴스화시 생성되는 Easy HTTPClient (내가 사용하고있는 클래스)를 사용하는 실행 파일과 같은 데몬을 만드는 것은 쉽지 않습니다. 프로그램 실행 중에 다시 사용됩니다.

+0

원래 나는 같은 문제를 겪고 있다고 생각했지만, 언급 한 문제는 발견하지 못했습니다. 내가 특정 호스트에 대한 연결을 유지하고 연결을 유지하면서 하루에 100 또는 1000의 연결을 열지는 않지만 2 개월 동안 눈에 띄는 메모리 누출없이 6634 연결을 열었습니다. 프로그램이 시작된 이래로 메모리 소비는 꾸준했습니다. 라이브러리에 대한 광범위한 테스트를 수행하지는 않았지만, 적어도 내 상황에서는 메모리 누수가 문제가되지 않았습니다. –

관련 문제