2013-04-25 2 views
4

아래의 예제 코드를 고려해보십시오 (오류가 발생해도 오류가 발생하는 경우 신속하게 입력했습니다. 이론에 관심이 있습니다).recv가 차단되면 어떻게 올바르게 정리할 수 있습니까?

bool shutDown = false; //global 

int main() 
{ 
    CreateThread(NULL, 0, &MessengerLoop, NULL, 0, NULL); 
    //do other programmy stuff... 
} 


DWORD WINAPI MessengerLoop(LPVOID lpParam) 
{ 
    zmq::context_t context(1); 
    zmq::socket_t socket (context, ZMQ_SUB); 
    socket.connect("tcp://localhost:5556"); 
    socket.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6); 

    while(!shutDown) 
    { 
    zmq_msg_t getMessage; 
    zmq_msg_init(&getMessage); 
    zmq_msg_recv (&getMessage, socket, 0); //This line will wait forever for a message 
    processMessage(getMessage); 
    } 
} 

들어오는 메시지를 기다리고 적절히 처리하기위한 스레드가 만들어집니다. 스레드는 shutDown이 true로 설정 될 때까지 루핑됩니다.

ZeroMQ에서 Guide은 무엇을 정리해야하는지, 즉 메시지, 소켓 및 컨텍스트를 명시합니다.

내 문제는 : recv이 메시지를 영원히 기다리고 스레드를 차단하므로 메시지를받지 못하면 어떻게 안전하게이 스레드를 종료 할 수 있습니까?

+0

'zmq_msg_recv'에서'ZMQ_DONTWAIT' 플래그를 사용하고 짧은 시간 동안 매뉴얼'Sleep'을 추가하십시오? 더 많은 폴링 방법을 사용 하시겠습니까? –

+0

zmg_ * API에 대해 모르겠지만 일반 소켓 API에서는 다른 스레드에서 소켓을 닫으면 충분하며 recv는 실패로 종료됩니다. 이렇게하려면 다른 스레드의 소켓에 연결하여'socket' 변수를 닫을 수 있어야합니다. 'MessengerLoop'은 무한'recv' 루프 만 포함해야합니다. –

+0

@AlexFarber 필자는 실수로 생각하지 않는다면 동일한 MessengerLoop 스레드에서 모든 메시지 처리가 수행되지 않을 것이라고 생각 했었습니까?어떤 경우에는 "나는 메시지를 기다리고있어 네가 나를 해산시킬 수있다"와 "나는 뭔가를 처리하는 데 바쁘다. 아직 나를 끝내지 말자."를 구별하는 신호 시스템을 설치해야한다. 이것은 Polling 기반 시스템보다 난해하게 보인다. 나는 말라 키 메시징을하는이 네트워크 메시징에 익숙하지 않다. 나는 뭔가를 놓치고 있는가? – Ian

답변

7

블로킹 호출 비 차단을 사용할 수 있습니다. 먼저, 언어와 바인딩에 따라 인터럽트 (Ctrl-C, SIGINT, SIGTERM)가 호출을 종료합니다. 바인딩 또는 바인딩에 따라 오류 또는 널 메시지 (libzmq가 EINTR 오류를 리턴)를 리턴합니다.

둘째, 다른 스레드에서 컨텍스트를 종료하면 차단 호출도 종료됩니다 (libzmq는 ETERM 오류를 반환 함).

세 번째로 소켓에 시간 초과를 설정할 수 있으므로 어떤 경우에도 시간 제한이 초과 된 경우 아무런 데이터도 반환되지 않습니다. 우리는 종종 이렇게하지는 않지만 어떤 경우에는 유용 ​​할 수 있습니다.

마지막으로, 우리는 실제로 receive를 차단하지 않지만 zmq_poll을 사용하여 소켓에 대기중인 메시지가 있는지 알아 본 다음 해당 소켓에서 수신합니다. 이것은 더 많은 소켓을 처리하는 방법입니다.

+0

을 계속 스킵하면 REQ-REP 쌍에 대한 폴링을 계속 사용 하시겠습니까? – Ian

+0

예. 블로킹 읽기는 정확하게 하나의 소켓에서 작동하는 단순한 작업에서는 문제가 없지만 대부분의 실제 작업에서는 여러 개의 소켓을 사용합니다. 예를 들어 REQ 소켓을 사용하여 작업을 수신 한 다음 몇 분마다 PUB 소켓에 상태를 기록 할 수 있습니다. –

+0

정확히 구현 한 것입니다. 나는 올바른 길을 가고 있다는 것을 알고 있습니다. 감사! – Ian

1

당신은 몇 가지 방법으로 종료됩니다 호출 플래그를 ZMQ_DONTWAIT

while(!shutDown) 
    { 
    zmq_msg_t getMessage; 
    zmq_msg_init(&getMessage); 
    while(-1 == zmq_msg_recv(&getMessage, socket, ZMQ_DONTWAIT)) 
    { 
     if (EAGAIN != errno || shutDown) 
     { 
     break; 
     } 
     Sleep(100); 
    } 
    processMessage(getMessage); 
    } 
+0

이 작은 논리 오류 : 1)의 경우 (EAGAIN == errno를 || 종료) 2) 를 추가하는 경우 (종료) { 휴식; } recv while 루프에서 깨어 난 후 processMessage – menrfa

0

zmq 컨텍스트가 제거 될 때마다 zmq_msg_recv는 -1을 수신합니다. 나는 이것을 모든 코드에서 종료 조건으로 사용한다.

while (!shutdown) 
{ 
    .. 
    .. 
    int rc = zmq_msg_recv (&getMessage, socket, 0); 
    if (rc != -1) 
    { 
     processMessage; 
    } 
    else 
     break; 
} 

올바른 정리를 위해 main() 끝에있는 zmq 컨텍스트를 파괴해야합니다.

zmq_ctx_destroy(zctx); 
0

ZMQ 메시지의 수신을 관리하는 SUB (가입자)라고하는 클래스가 있다고 가정 해 보겠습니다. 소멸자 또는 주요 기능/클래스의 exit 함수에서 다음 전화 :

pub->close(); 

/// 
/// Close the publish context 
/// 
void PUB::close() 
{ 
    zmq_close (socket); 
    zmq_ctx_destroy (context); 
} 

이 활성화 것이다 'recv를'당신이 무시할 수있는 오류 메시지와 함께 종료를 차단. 응용 프로그램은 올바른 방법으로 편안하게 종료됩니다. 이것이 올바른 방법입니다. 행운을 빕니다!

관련 문제