2013-04-29 2 views
5

최근 IPC 용 공유 메모리를 사용하여 놀았습니다. 구현하고자하는 한 가지 점은 하나의 프로세스가 생성되고 하나의 프로세스가 소비되는 단순한 링 버퍼입니다. 각 프로세스는 자신의 위치를 ​​추적 할 자체 일련 번호를 가지고 있습니다. 이 시퀀스 번호는 원자 프로세스를 사용하여 업데이트되어 다른 프로세스에서 올바른 값을 볼 수 있습니다. 링 버퍼가 가득 차면 생성자가 차단됩니다. 코드는 세마포어 또는 뮤텍스가 사용되지 않는다는 점에서 잠금이 없습니다. 현명한 내가 오히려 겸손 VM에 초당 약 2,000 만 메시지를 받고 있어요공유 메모리의 단일 프로듀서/소비자 링 버퍼

실적 - 그와 함께 꽤 행복 :

내 코드가 얼마나 '올바른'에 대한 궁금 무엇

. 누구나 내재 된 문제/경쟁 조건을 발견 할 수 있습니까? 여기 내 코드가있다. 의견에 미리 감사드립니다.

#include <stdlib.h> 
#include <stdio.h> 
#include <fcntl.h> 
#include <sys/mman.h> 
#include <sys/stat.h> 
#include <time.h> 
#include <unistd.h> 
#include <string.h> 

#define SHM_ID "/mmap-test" 
#define BUFFER_SIZE 4096 
#define SLEEP_NANOS 1000 // 1 micro 

struct Message 
{ 
    long _id; 
    char _data[128]; 
}; 

struct RingBuffer 
{ 
    size_t _rseq; 
    char _pad1[64]; 

    size_t _wseq; 
    char _pad2[64]; 

    Message _buffer[BUFFER_SIZE]; 
}; 

void 
producerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR | O_CREAT, 0600); 
    ftruncate(fd, size+1); 

    // create shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
    close(fd); 

    // initialize our sequence numbers in the ring buffer 
    rb->_wseq = rb->_rseq = 0; 
    int i = 0; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // as long as the consumer isn't running behind keep producing 
     while((rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE) 
     { 
      // write the next entry and atomically update the write sequence number 
      Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE]; 
      msg->_id = i++; 
      __sync_fetch_and_add(&rb->_wseq, 1); 
     } 

     // give consumer some time to catch up 
     nanosleep(&tss, 0); 
    } 
} 

void 
consumerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR, 0600); 
    if(fd == -1) { 
     perror("argh!!!"); return; 
    } 

    // lookup producers shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 

    // initialize our sequence numbers in the ring buffer 
    size_t seq = 0; 
    size_t pid = -1; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // while there is data to consume 
     while(seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE) 
     { 
      // get the next message and validate the id 
      // id should only ever increase by 1 
      // quit immediately if not 
      Message msg = rb->_buffer[seq%BUFFER_SIZE]; 
      if(msg._id != pid+1) { 
       printf("error: %d %d\n", msg._id, pid); return; 
      } 
      pid = msg._id; 
      ++seq; 
     } 

     // atomically update the read sequence in the ring buffer 
     // making it visible to the producer 
     __sync_lock_test_and_set(&rb->_rseq, seq); 

     // wait for more data 
     nanosleep(&tss, 0); 
    } 
} 

int 
main(int argc, char** argv) 
{ 
    if(argc != 2) { 
     printf("please supply args (producer/consumer)\n"); return -1; 
    } else if(strcmp(argv[1], "consumer") == 0) { 
     consumerLoop(); 
    } else if(strcmp(argv[1], "producer") == 0) { 
     producerLoop(); 
    } else { 
     printf("invalid arg: %s\n", argv[1]); return -1; 
    } 
} 

답변

1

언뜻보기에 나에게 맞는 것 같습니다. 성능에 만족한다는 것을 알고 있지만 재미있는 실험은 __sync_fetch_and_add보다 가벼운 무게를 사용하는 것일 수 있습니다. AFAIK는 전체 메모리 장벽으로 비용이 많이 듭니다. 단일 생산자와 단일 소비자가 있기 때문에 릴리스 및 해당 취득 작업을 통해 성능이 향상됩니다. Facebook의 Folly 라이브러리에는 여기에 새로운 C++ 11 원자를 사용하는 단일 제작자 단일 고객 대기열이 있습니다. https://github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h

관련 문제