최근 IPC 용 공유 메모리를 사용하여 놀았습니다. 구현하고자하는 한 가지 점은 하나의 프로세스가 생성되고 하나의 프로세스가 소비되는 단순한 링 버퍼입니다. 각 프로세스는 자신의 위치를 추적 할 자체 일련 번호를 가지고 있습니다. 이 시퀀스 번호는 원자 프로세스를 사용하여 업데이트되어 다른 프로세스에서 올바른 값을 볼 수 있습니다. 링 버퍼가 가득 차면 생성자가 차단됩니다. 코드는 세마포어 또는 뮤텍스가 사용되지 않는다는 점에서 잠금이 없습니다. 현명한 내가 오히려 겸손 VM에 초당 약 2,000 만 메시지를 받고 있어요공유 메모리의 단일 프로듀서/소비자 링 버퍼
실적 - 그와 함께 꽤 행복 :
내 코드가 얼마나 '올바른'에 대한 궁금 무엇. 누구나 내재 된 문제/경쟁 조건을 발견 할 수 있습니까? 여기 내 코드가있다. 의견에 미리 감사드립니다.
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#define SHM_ID "/mmap-test"
#define BUFFER_SIZE 4096
#define SLEEP_NANOS 1000 // 1 micro
struct Message
{
long _id;
char _data[128];
};
struct RingBuffer
{
size_t _rseq;
char _pad1[64];
size_t _wseq;
char _pad2[64];
Message _buffer[BUFFER_SIZE];
};
void
producerLoop()
{
int size = sizeof(RingBuffer);
int fd = shm_open(SHM_ID, O_RDWR | O_CREAT, 0600);
ftruncate(fd, size+1);
// create shared memory area
RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
// initialize our sequence numbers in the ring buffer
rb->_wseq = rb->_rseq = 0;
int i = 0;
timespec tss;
tss.tv_sec = 0;
tss.tv_nsec = SLEEP_NANOS;
while(1)
{
// as long as the consumer isn't running behind keep producing
while((rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE)
{
// write the next entry and atomically update the write sequence number
Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE];
msg->_id = i++;
__sync_fetch_and_add(&rb->_wseq, 1);
}
// give consumer some time to catch up
nanosleep(&tss, 0);
}
}
void
consumerLoop()
{
int size = sizeof(RingBuffer);
int fd = shm_open(SHM_ID, O_RDWR, 0600);
if(fd == -1) {
perror("argh!!!"); return;
}
// lookup producers shared memory area
RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
// initialize our sequence numbers in the ring buffer
size_t seq = 0;
size_t pid = -1;
timespec tss;
tss.tv_sec = 0;
tss.tv_nsec = SLEEP_NANOS;
while(1)
{
// while there is data to consume
while(seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE)
{
// get the next message and validate the id
// id should only ever increase by 1
// quit immediately if not
Message msg = rb->_buffer[seq%BUFFER_SIZE];
if(msg._id != pid+1) {
printf("error: %d %d\n", msg._id, pid); return;
}
pid = msg._id;
++seq;
}
// atomically update the read sequence in the ring buffer
// making it visible to the producer
__sync_lock_test_and_set(&rb->_rseq, seq);
// wait for more data
nanosleep(&tss, 0);
}
}
int
main(int argc, char** argv)
{
if(argc != 2) {
printf("please supply args (producer/consumer)\n"); return -1;
} else if(strcmp(argv[1], "consumer") == 0) {
consumerLoop();
} else if(strcmp(argv[1], "producer") == 0) {
producerLoop();
} else {
printf("invalid arg: %s\n", argv[1]); return -1;
}
}