2016-07-14 2 views
1

는 다음과 같은 두 가지 프로세스를 고려ZeroMQ REQ/REP 성능

sender.cpp :

#include <zhelpers.h> 
... 
zmq::context_t ctx(1); 
... 
void foo(int i) 
{ 
    zmq::socket_t sender(ctx, ZMQ_REQ); 
    sender.connect("tcp://hostname:5000"); 

    std::stringstream ss; 
    ss <<"bar_" <<i; 
    std::string bar_i(std::move(ss.str()); 

    s_sendmore(sender, "foo "); 
    (i != N) ? s_send(sender, bar, 0) : s_send(sender, "done", 0); 
    s_recv(sender); 
} 

int main() 
{ 
    for(int i=0; i<=100000; ++i) 
     foo(i); 
    return 0; 
} 

receiver.cpp

#include <zhelpers.h> 
... 
int main() 
{ 
    zmq::context_t ctx(1); 
    zmq::socket_t rcv(ctx, ZMQ_REP); 
    rcv.bind("tcp://*:5000"); 

    std::string s1(""); 
    std::string s2(""); 

    while(s2 != "done") 
    { 
     s1 = std::move(s_recv(rcv)); 
     s2 = std::move(s_recv(rcv)); 
     std::cout <<"received: " <<s1 <<" " <<s2 <<"\n"; 
     s_send(rcv, "ACK"); 
    } 

    return 0; 
} 

이의 두 프로세스를 시작하자. 까지, 등등

foo bar_1 
foo bar_2 
... 

과 :

... 
foo bar_100000 

그리고 내가 기대하는 것은 수신기 프로세스가 보낸 사람이 그것을에 보내는 모든 메시지를 받게됩니다하고 인쇄 것이 오 어떤 차단도없이이 작업을 수행 할 것으로 기대합니다.

내 문제는 수신기가 항상 28215 번째 반복 (항상 해당 숫자 주위에 !!!)으로 붙어 있고 1 분 정도까지 차단된다는 것입니다. 그런 다음 100000까지 더 나아가지만 때로는 다시 붙어 있습니다. 제 질문은 물론입니다 : 왜 이런 일이 일어나고 있습니까? 어떻게 해결할 수 있습니까?

전역 범위에서 foo (.) 내에 '보낸 사람'을 넣으려고했는데 그 때 그것은 효과가있었습니다.이 경우 모든 인쇄물은 1에서 100000까지 부드럽고 초고속으로 물론이 경우 foo (.)가 호출 될 때마다 소켓이 생성되지 않았습니다. 하지만 불행히도 제 코드에서는 그렇게 할 수 없습니다.

왜이 블록이 발생하는지 이해하고 싶습니다.

+0

최대 소켓은 서버 측에서 제한 될 수 있습니다. 그것이 그것을 해결할 수도 증가 증가보십시오. tcp가 죽은 소켓을 지우는 데는 시간이 걸리고 소켓의 최대 수에 도달하는 많은 것들이 있기 때문입니다. – somdoron

답변

0

우선, 귀하의 예제는 컴파일되지 않기 때문에 매우 적합하지 않습니다. 그래서 여기 당신의 의도에 가까운 실제로

sender.cpp

#include <zmq.hpp> 
#include <string> 
#include <iostream> 
#include <string> 

void send(const std::string& msg) 
{ 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REQ); 

    std::cout << "Connecting to receiver ..." << std::endl; 
    socket.connect ("tcp://localhost:5555"); 

    zmq::message_t request (100); 
    memcpy (request.data(), msg.c_str(), 100); 
    std::cout << "Sending message " << msg << "..." << std::endl; 
    socket.send (request); 
} 

int main() 
{ 
    for(int i = 0; i < 100000; ++i) 
    { 
     send(std::to_string(i)); 
    } 
    send("done"); 
} 

사용 뭔가 LINKE

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include sender.cpp -lzmq -o sender 

receiver.cpp

#include <zmq.hpp> 
#include <string> 
#include <cstring> 
#include <iostream> 

int main() { 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REP); 
    socket.bind ("tcp://*:5555"); 

    char buf[100] = {0}; 
    while (std::string(buf).compare("done")) { 
     zmq::message_t request; 

     // Wait for next request from client 
     socket.recv (&request); 
     std::memcpy(buf, request.data(), 100); 
     std::cout << "Received message " << buf << std::endl; 

     // Send reply back to client 
     zmq::message_t reply (5); 
     memcpy (reply.data(), "Hello", 5); 
     socket.send (reply); 
    } 
    return 0; 
} 

사용을 컴파일 할 몇 가지 exapmles은

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include receiver.cpp -lzmq -o receiver 
프로세스를 시작할 때, 모든 것이 잘 작동하는 것 같다

는 수신기의 출력은없고 휴식 예상대로 :

Received message 99996 
Received message 99997 
Received message 99998 
Received message 99999 
Received message done 

하지만 내가 기대했던 :

netstat 
Active Internet connections (w/o servers) 
Proto Recv-Q Send-Q Local Address   Foreign Address   State  
tcp  0  0 localhost:38345   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46228   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60309   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46916   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:47600   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:54454   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46409   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51142   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40355   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40005   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45614   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48974   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41427   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58740   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58754   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60044   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57478   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:50419   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:44361   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:37284   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:38662   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45968   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57407   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:59200   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41292   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:55243   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51489   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48865   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:35491   localhost:5555   TIME_WAIT 
... 
: NETSTAT를 보라

한 번 실행 한 후 TIME_WAIT 상태의 소켓이 20k 이상 (!) 있습니다. 가변 범위가 socketvoid send(...)sender.cpp에 있기 때문입니다. 정확히 zmq가 소켓 범위를 벗어날 때 소켓을 파괴 할 때 어떤 일을하는지 잘 모르겠지만 소켓의 fd 어딘가에 close()을 호출하여이 TIME_WAIT 상태로 소켓을 가져올 것이라고 확신합니다. 내 보낸 사람과받는 사람이 매끄럽게 달릴지라도 시스템이이 소켓을 어떻게 처리하는지 모르겠습니다. 또한, 귀하의 zhelpers.h 파일이 무엇인지 모르겠습니다.그러나 소켓을 전역 범위에 배치하면 하나의 단일 소켓에서 발신자 측에서 하나의 close() 호출 만 발생한다는 것을 알고 있습니다. 더 자세히 조사하기 위해 여기부터 시작하겠습니다. 어쩌면, 봐 how-to-forcibly-close-a-socket-in-time-wait ...

+0

감사합니다. 확인 할게. 죄송합니다 내 코드가 컴파일되지 않았습니다. 나는 그저 당신에게 문제 자체를 보여 주려했다. (그래서 나는 예를 들어 '...'을 사용했다.). 나는 모든 세부 사항에 가고 싶지 않았다. 예를 들어 zhelpers.h는 다음에서 찾을 수 있습니다. https://github.com/imatix/zguide2/tree/master/examples/C%2B%2B – gybacsi

+1

또 다른 한 가지는, 글로벌 범위에서 소켓 변수를 전송하지만 스코프를 조금 더 크게 만들면 TIME_WAIT 명시된 수천 개의 소켓으로 이미 문제가 해결됩니다. 즉, 송신 루프 외부에서 선언하고 적어도 100000 개의 메시지를 보내는 데 다시 사용하십시오. . – yussuf