2011-10-28 3 views
0

나는 멀티 스레드 웹 서버를 만들어야하는 학교 프로젝트에서 일하고있다. 문제가 어디에 내가 sem_wait 내 세마포 (0으로 초기화해야하지만 이미 1시에 sem_post() 것 같다)를 호출하는 데 문제가 있습니다. 나는 SIGABRT를 얻는다.POSIX sem_wait() SIGABRT

아래 코드를 첨부하고 문제의 원인이되는 행에 주석을 달았습니다. 나는 운이 좋은 디버거로 몇 시간을 보냈다.

#include <iostream> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <sys/stat.h> 
#include <netinet/in.h> 
#include <netdb.h> 
#include <string> 
#include <string.h> 
#include <iostream> 
#include <fcntl.h> 
#include <errno.h> 
#include <pthread.h> 
#include <vector> 
#include <semaphore.h> 
#include <stdio.h> 
#include <cstdlib> 
#include <strings.h> 

#define PORTNUM 7000 
#define NUM_OF_THREADS 5 
#define oops(msg) { perror(msg); exit(1);} 
#define FCFS 0 
#define SJF 1; 

void bindAndListen(); 
void acceptConnection(int socket_file_descriptor); 
void* dispatchJobs(void*); 
void* replyToClient(void* pos); 

//holds ids of worker threads 
pthread_t threads[NUM_OF_THREADS]; 

//mutex variable for sleep_signal_cond 
pthread_mutex_t sleep_signal_mutex[NUM_OF_THREADS]; 
//holds the condition variables to signal when the thread should be unblocked 
pthread_cond_t sleep_signal_cond[NUM_OF_THREADS]; 

//mutex for accessing sleeping_thread_list 
pthread_mutex_t sleeping_threads_mutex = PTHREAD_MUTEX_INITIALIZER; 
//list of which threads are sleeping so they can be signaled and given a job 
std::vector<bool> *sleeping_threads_list = new std::vector<bool>(); 

//number of threads ready for jobs 
sem_t available_threads; 
sem_t waiting_jobs; 


//holds requests waiting to be given to one of the threads for execution 
//request implemented as int[3] with int[0]== socket_descriptor int[1]== file_size int[2]== file_descriptor of requested file 
//if file_size == 0 then HEAD request 
std::vector<std::vector<int> >* jobs = new std::vector<std::vector<int> >(); 

pthread_mutex_t jobs_mutex = PTHREAD_MUTEX_INITIALIZER; 


int main (int argc, char * const argv[]) { 
    //holds id for thread responsible for removing jobs from ready queue and assigning them to worker thread 
    pthread_t dispatcher_thread; 

    //initializes semaphores 
    if(sem_init(&available_threads, 0, NUM_OF_THREADS) != 0){ 
     oops("Error Initializing Semaphore"); 
    } 

    if(sem_init(&waiting_jobs, 0, 0) !=0){ 
     oops("Error Initializing Semaphore"); 
    } 

    //initializes condition variables and guarding mutexes 
    for(int i=0; i<NUM_OF_THREADS; i++){ 
     pthread_cond_init(&sleep_signal_cond[i], NULL); 
     pthread_mutex_init(&sleep_signal_mutex[i], NULL); 
    } 

    if(pthread_create(&dispatcher_thread, NULL, dispatchJobs, (void*)NULL) !=0){ 
     oops("Error Creating Distributer Thread"); 
    } 

    for (int i=0; i<NUM_OF_THREADS; i++) { 
     pthread_mutex_lock(&sleeping_threads_mutex); 
     printf("before"); 
     sleeping_threads_list->push_back(true); 
     printf("after"); 
     pthread_mutex_unlock(&sleeping_threads_mutex); 
    } 

    printf("here"); 
    for (int i=0; i<NUM_OF_THREADS; i++) { 
     //creates threads and stores ID in threads 
     if(pthread_create(&threads[i], NULL, replyToClient, (void*)i) !=0){ 
      oops("Error Creating Thread"); 
     } 
    } 

    /* 
    if(sem_init(&available_threads, 0, NUM_OF_THREADS) !=0){ 
     oops("Error Initializing Semaphore"); 
    } 

    if(sem_init(&waiting_jobs, 0, 0) !=0){     //this is the semaphore thats used in the sem_wait 
     oops("Error Initializing Semaphore"); 
    }*/ 

    bindAndListen(); 
} 


//binds to socket and listens for connections 
//being done by main thead 
void bindAndListen(){ 
    struct sockaddr_in saddr; 
    struct hostent *hp; 
    char hostname[256]; 
    int sock_id, sock_fd; 

    gethostname(hostname, 256); 
    hp = gethostbyname(hostname); 
    bzero(&saddr, sizeof(saddr)); 

    //errno = 0; 

    bcopy(hp->h_addr, &saddr.sin_addr, hp->h_length); 

    saddr.sin_family = AF_INET; 
    saddr.sin_port = htons(PORTNUM); 
    saddr.sin_addr.s_addr = INADDR_ANY; 

    sock_id = socket(AF_INET, SOCK_STREAM, 0); 

    if(sock_id == -1){ 
     oops("socket"); 
     printf("socket"); 
    } 

    if(bind(sock_id, (const sockaddr*)&saddr, sizeof(saddr)) ==0){ 

     if(listen(sock_id, 5) ==-1){ 
      oops("listen"); 
     } 

     //each time a new connection is accepted, get file info and push to ready queue 
     while(1){ 
      int addrlen = sizeof(saddr); 
      sock_fd = accept(sock_id, (sockaddr*)&saddr, (socklen_t*)&addrlen); 
      if (sock_fd > 0) { 
       acceptConnection(sock_fd); 
      }else { 
       oops("Error Accepting Connection"); 
      } 
     } 
    }else{ 
     oops("there was an error binding to socket"); 
    } 
}// end of bindAndListen() 


//accepts connection and gets file info of requested file 
//being done by main thread 
void acceptConnection(int sock_fd){ 
    printf("**Server: A new client connected!"); 

    //only using loop so on error we can break out on error 
    while(true){ 
     //used to hold input from client 
     char* inputBuff = new char[BUFSIZ]; 
     int slen = read(sock_fd, inputBuff, BUFSIZ); 

     //will sit on space between HEAD/GET and path 
     int pos1 = 0; 
     //will sit on space between path and HTTP version 
     int pos2 = 0; 

     //need duplicate ptr so we can manipulate one in the loop 
     char* buffPtr = inputBuff; 
     //parses client input breaks up query by spaces 
     for(int i=0; i<slen; i++){ 
      if(*buffPtr == ' '){ 
       if (pos1 == 0) { 
        pos1 = i; 
       }else { 
        pos2 = i; 
        break; 
       } 
      } 
      buffPtr++; 
     } 

     if((pos1 - pos2) >=0){ 
      std::string str = "Invalid Query"; 
      write(sock_fd, str.c_str(), strlen(str.c_str())); 
      break; 
     } 

     printf("slen length %d\n", slen); 

     std::string* method = new std::string(inputBuff, pos1); 

     printf("method length %lu\n",method->length()); 

     //increment the ptr for buff to the starting pos of the path 
     inputBuff += ++pos1; 

     printf("pos2 - pos1 %d\n", (pos2 - pos1)); 

     printf("pos1 = %d pos2 = %d\n", pos1, pos2); 

     std::string* path = new std::string(inputBuff, (pos2 - pos1)); 

     printf("path length %lu\n", path->length()); 

     printf("part1 %s\n", method->c_str()); 

     printf("part2 %s\n", path->c_str()); 

     //opens file requested by client 
     int fd = open(path->c_str(), O_RDONLY); 
     if(fd < 0){ 
      std::string* error = new std::string("Error Opening File"); 
      *error += *path + std::string(strerror(errno), strlen(strerror(errno))); 
      write(sock_fd, error->c_str(), strlen(error->c_str())); 
      break; 
     } 

     int file_size; 
     if(method->compare("GET") == 0){ 
      //gets file info and puts the resulting struct in file_info 
      struct stat file_info; 
      if(fstat(fd, &file_info) !=0){ 
       oops("Error getting file info"); 
      } 
      file_size = file_info.st_size; 
     }else if(method->compare("HEAD")){ 
      file_size = 0; 
     }else{ 
      write(sock_fd, "Invalid Query", strlen("Invalid Query")); 
      break; 
     } 

     //job to be pushed to ready queue 
     std::vector<int> job; 
     job.push_back(sock_fd); 
     job.push_back(file_size); 
     job.push_back(fd); 

     //check mutex guarding the ready queue 
     pthread_mutex_lock(&jobs_mutex); 
     //push job to back of ready queue 
     jobs->push_back(job); 
     //unlock mutex guarding the ready queue 
     pthread_mutex_unlock(&jobs_mutex); 

     //increment number of jobs in ready queue 
     sem_post(&waiting_jobs); 

    } //end of while(true) 
     // we only end up here if there was an error 
    fflush(stdout); 
    close(sock_fd); 
}// end of acceptConnection() 


//routine run by dispather thread 
void *dispatchJobs(void*){ 
    while(true){ 
     //wait for a thread to be available to execute a job 
     sem_wait(&available_threads); 
     //wait for a job to be waiting in the ready queue 
     sem_wait(&waiting_jobs);     //this is the line thats crashing 
     //aquire lock to check which threads are waiting 
     pthread_mutex_lock(&sleeping_threads_mutex); 
     //go through list of threads to see which is waiting 
     for(int i=0; i<sleeping_threads_list->size(); i++){ 
      if(sleeping_threads_list->at(i)){ 
       //unlocks lock for access to list of waiting threads 
       pthread_mutex_unlock(&sleeping_threads_mutex); 
       //allows us access to the list of condition variables to signal the thread to resume execution 
       pthread_mutex_lock(&sleep_signal_mutex[i]); 
       pthread_cond_signal(&sleep_signal_cond[i]); 
       pthread_mutex_unlock(&sleep_signal_mutex[i]); 
      } 
     } 

    }//end of while(true) 
}//end of dispatchJobs() 


//sends file or metadata to client 
//run by worker thread 
//pos is position of condition variable that it waits to be signaled in the sleep_signal_cond[] array 
void* replyToClient(void* pos){ 
    int position = (long)pos; 
    while(true){ 
     //waits for dispather thread to signal it 
     pthread_mutex_lock(&sleep_signal_mutex[position]); 
     pthread_cond_wait(&sleep_signal_cond[position], &sleep_signal_mutex[position]); 
     pthread_mutex_unlock(&sleep_signal_mutex[position]); 


     //lock mutex to get job to be executed 
     pthread_mutex_lock(&jobs_mutex); 
     std::vector<int> job = jobs->front(); 
     //removes job from front of vector 
     jobs->erase(jobs->begin()); 
     //releases mutex 
     pthread_mutex_unlock(&jobs_mutex); 

     //socket file descriptor, used for writing to socket 
     int sock_fd =job[0]; 
     int file_size = job[1]; 
     //file descriptor for requested job 
     int fd = job[2]; 

     //holds output to be written to socket 
     char* outputBuffer = new char[BUFSIZ]; 

     //GET request, send file 
     if(file_size !=0){ 
      int readResult = 0; 
      while ((readResult = read(fd, outputBuffer, BUFSIZ)) > 0) { 
       if(write(sock_fd, outputBuffer, readResult) != readResult){ 
        printf("We may have a write error"); 
       } 
      } 
      if(readResult < 0){ 
       oops("Error Reading File"); 
      } 
      if(readResult == 0){ 
       printf("finished sending file"); 
      } 
     }else{ // HEAD request 

     } 
     //increment number of available threads 
     sem_post(&available_threads); 
    } 
}// end of replyToClient() 
+4

그것은 당신의 예를 단축 할 수 있습니까? (http://sscce.org/) –

답변

1

필자는 POSIX 세마포어를 사용하지 않았지만 이것이 무엇이 일어나고 있는지 생각합니다. 필자는 Linux 커널 세마포어에 대해서만 잘 알고 있으며 시스템에 대해서는 언급하지 않았습니다. init 함수의 세 번째 매개 변수는 아마도 count 변수를 설정합니다. 0 (= 사용 중이지만 대기중인 다른 프로세스 없음)으로 설정합니다. wait 함수는 count 변수를 1 : 1로 줄이는 것으로 시작하는 down()을 호출합니다. 즉, 사용하려고하는 세마포어가 잠겨 있습니다. 프로그램에서 코드를 탐색하는 것에서부터 꽤 오래 걸릴 것 같아서 잠금을 해제 할 수있는 것이 아무것도 없으므로 문제가 발생합니다. init에서 1로 설정해보십시오. 이것은 모두 필요한 것일 수 있습니다. 다시

+0

글쎄, 코드를 풀지는 않았지만 코드 영역에 도달했다. – gnometorule

2

확인 코드의 전체 논리 - 여기에 도달 할 수 있습니다 : jobs->size() == 0

pthread_mutex_lock(&jobs_mutex); 
std::vector<int> job = jobs->front(); 
//removes job from front of vector 
jobs->erase(jobs->begin()); 
//releases mutex 
pthread_mutex_unlock(&jobs_mutex); 

, front()erase()가 잘 관찰 효과가 발생할 수 있습니다 정의되지 않은 동작을 호출하는 경우.

프로그램이 여전히 다음과 같이 변경 후 충돌 확인 여부 :

//lock mutex to get job to be executed 
pthread_mutex_lock(&jobs_mutex); 
if (jobs->size() == 0) 
    { 
    pthread_mutex_unlock (&jobs_mutex); 
    continue; 
    } 
std::vector<int> job = jobs->front(); 
//removes job from front of vector 
jobs->erase(jobs->begin()); 
//releases mutex 
pthread_mutex_unlock(&jobs_mutex);