2011-03-29 5 views
2

zeroMQ을 여러 스레드간에 메시징 시스템을 구현하는 방법으로 사용하려고합니다. 아래 코드를 시도했지만 작동하지 않습니다. 구체적으로 각 스레드의 zmq_recv 호출은 실행될 메시지를 대기/차단하지 않습니다.ZeroMQ 메시지를 사용하여 스레드 간 통신

이 코드를 사용하면 도움이 될 수 있습니까?

나는 당신은 소켓을 종료하고 있고 ZeroMQ 당신이 스레드를 만드는 직후에 리눅스 OS 및 GCC

안부

AFG

static void * 
    worker_routine (void *context) { 
     // Socket to talk to dispatcher 
     void *receiver = zmq_socket (context, ZMQ_REP); 
     zmq_connect (receiver, "inproc://workers"); 
     while (1) { 

      zmq_msg_t request; 
      zmq_msg_init(&request); 
      zmq_recv(receiver, &request, 0); 
      printf ("Received request\n"); 
      // Do some 'work' 
      usleep (1000); 
      // Send reply back to client 
      zmq_send (receiver, &request, 0); 
     } 
     zmq_close (receiver); 
     return NULL; 
    } 

    int main (void) { 

    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
    } 
+0

. 내 목적을 위해 QUEUE와 같은 zmq_device를 만들어야 만하는지 아는 사람 있습니까? 나는 또한 프로토콜로 "ipc"를 사용하는 샘플이 있다는 것을 알아 냈다. 나는 항상 MT에서 반드시 "inproc"을 사용해야한다고 생각했다. –

답변

3

을 사용하고 있습니다. 그들은 아마도 블로킹 상태에 도달 할 시간이 없을 것이며, 그렇다면 zmq 컨텍스트를 파괴하자마자 실패 할 것입니다. zmq_term man page에서 :

상황의 종료는 다음 단계로 수행됩니다 : 현재 컨텍스트 내에서 열려있는 소켓에 진행

모든 차단 작업이 ETERM의 에러 코드와 함께 즉시을 반환 된다.

+0

아마도 "sleep"비트를 추가해보십시오. 어쨌든 여전히 내가 기대했던 행동이 그 설정으로 이루어 졌는지는 모르겠다. 질문 아래에 내 의견을 읽어라. 추신. 당신의 도움을 주셔서 감사합니다! –

6

두 소켓 모두 REP입니다. 원하는 것은 REQ + REP입니다.

0

먼저 @sustrik은 REQREP을 사용해야하며 주 스레드와 작업자 스레드는 모두 REP이 될 수 없습니다.

둘째, 당신은 당신의 주 스레드에서 차단 루프의 일종을 제공해야합니다 : 다시 ZeroMQ에 가이드를 읽고 있었다

int main (int argc, char **argv) 
{ 
    void *context = zmq_init (1); 
    void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients 
    zmq_bind (clients, "inproc://workers"); 

    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 

    while (TRUE) 
    { 
        // worker thread connected asking for work 
        zmq_msg_t request; 
        zmq_msg_init (&request); 
     zmq_recv (clients, &request, 0); 
     zmq_msg_close (&request); 

     // do whatever you need to do with the clients' request here 

     // send work to clients 
     zmq_msg_t reply; 
     zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL); 
     zmq_send (clients, &reply, 0); 
     zmq_msg_close (&reply); 
    } 

    zmq_close (clients); 
    zmq_term (context); 
    return 0; 
}