2013-08-23 6 views
2

그래서 저는 Python의 ZMQ와 C/C++ 확장의 ZMQ 사이의 간단한 통신을 설정하려고합니다. 파이썬은 컨텍스트를 설정하고, inproc 소켓을 바인드하며, 컨텍스트와 소켓 이름을 확장에 전달합니다. 확장 프로그램은 자체 소켓을 설정하고, 연결하고, 메시지를 수신합니다. 그런 다음 Python은 머리말을 보낸 다음 사전의 문자열 표현을 확장자로 보냅니다. REQ/REP 소켓을 사용하는 아주 간단한 것들. 그러나 찾을 수없는 몇 가지 이유로 socket.send에 대한 호출이 차단되고 확장이 zmq_recv에 대한 호출을 지나칠 수 없습니다. 나는 거의 동일한 시나리오가 적용되는 테스트 환경을 가지고 있지만 소켓은 차단되지 않고 코드를 세 번 검사했고 같은 방식으로 작동해야합니다.ZMQ : socket_send/recv blocking

PYTHON :

import zmq 
import cppextension 
# No lectures about using threading please. I'm restricted to, in essence 
# using this function because of the code base I'm working with. 
from thread import start_new_thread 

socket = self.zmq_context.socket(zmq.REQ) 
socket_name = "inproc://agl" 
socket.bind(socket_name) 
t = start_native_thread(cppextension.actor, 
         (self.zmq_context, socket_name)) 

test_send = {"foo": 1, "bar": 2} 
# BLOCKS ON THIS LINE VVVVV 
socket.send("TEST", flags=zmq.SNDMORE) 
socket.send(str(test_send)) 
socket.recv() 
socket.send("STOP") 

C/C++ :

// Originally these used std::basic_string<Py_UNICODE> but I reverted 
// back to normal std::string so I can use a JSON parsing library. 
typedef string pystring; 
typedef char pystring_t; 

extern "C" PyObject * 
actor(PyObject *self, PyObject *args) { 
    PyObject *py_context, *py_connect_to; 
    PyThreadState *_save; 
    void *context; 
    char *connect_to; 
    void *socket; 
    int rc; 

    if(!PyArg_ParseTuple(args, "OO", &py_context, &py_connect_to)) { 
     PyErr_SetString(PyExc_TypeError, "Expected two arguments (ZMQ context, name of socket to connect to)"); 
     return NULL; 
    } 
    py_context = PyObject_GetAttrString(py_context, "_handle"); 
    if(py_context == NULL) { 
     PyErr_SetString(PyExc_TypeError, "Could not get '_handle' from context"); 
     return NULL; 
    } 
    if(!PyInt_Check(py_context)) { 
     PyErr_SetString(PyExc_TypeError, "_handle was not an integer"); 
     return NULL; 
    } 
    context = (void*)PyInt_AsLong(py_context); 
    connect_to = new char[PyString_Size(py_connect_to) + 1]; 
    strcpy(connect_to, PyString_AsString(py_connect_to)); 
    _save = PyEval_SaveThread(); 

    // 
    // GIL-less operation BEGIN 
    // ** WARNING: Do NOT call any functions that begin with 'Py', or touch any 
    // data structures that begin with 'Py' while in this section. It *WILL* 
    // blow up the Python interpreter. 
    // 
    socket = zmq_socket(context, ZMQ_REP); 
    rc = zmq_connect(socket, connect_to); 

    pystring TEST("TEST"); 
    pystring STOP("STOP"); 
    pystring SUCCESS("SUCCESS"); 
    pystring FAILURE("FAILURE"); 

    if(rc == 0) { 
     int going = 1; 
     // Should be able to hold a full megabyte of text, which should be enough 
     // for any message being passed in. 
     // Is there a way to query size of the incoming message...? 
     char buffer[1000000]; 
     while(going) { 
      // BLOCKS ON THIS LINE VVVVVV 
      int size = zmq_recv(socket, buffer, 1000000, 0); 
      if(size == -1) { 
       // ERROR 
       continue; 
      } 
      // Assume we don't get larger than 1MB of data. Should put a 
      // check around this at some point, but not right now. 
      buffer[size] = 0; 

      pystring fullmsg(buffer); 
      cout << "ZMQ RECIEVED: " << fullmsg << endl; 
      if(fullmsg == TEST) { 
       size = zmq_recv(socket, &buffer, 1000000, 0); 
       if(size != -1) { 
        buffer[size] = 0; 
        pystring json_fullmsg(buffer); 
        cout << "ZMQ JSON: " << json_fullmsg << endl; 
        contacts.add(json_fullmsg); 
        zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0); 
       } 
       else { 
        zmq_send(socket, FAILURE.c_str(), FAILURE.size() + 1, 0); 
       } 
      } 
      else if(fullmsg == STOP) { 
       going = 0; 
       zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0); 
      } 
     } 
    } 
    else { 
     // ERROR 
     int err = zmq_errno(); 
     switch(err) { 
     case EINVAL: 
      cout << "ZMQ CONNECT ERR: " << "Endpoint supplied is invalid" << endl; 
      break; 
     default: 
      cout << "ZMQ CONNECT ERR: " << err << endl; 
      break; 
     } 
    } 
    zmq_close(socket); 
    // 
    // GIL-less operation END 
    // 

    PyEval_RestoreThread(_save); 
    Py_INCREF(Py_None); 
    return Py_None; 
} 

매우 감사합니다 여기에서 무슨 일을 알아내는 어떤 도움.

EDIT :이 코드는 gevent가 표준 라이브러리를 monkeypatched 한 환경에서 실행됩니다. 이것은 monkeypatching이 발생하기 전에 저장 되었기 때문에 thread.start_new_thread를 사용하는 이유 중 일부입니다. 그리고 실제 스레드가 녹색 스레드가 아닌 것을 원합니다.

+0

문제가 해결되지는 않지만 Python은 "//"이 아닌 주석에 "#"을 사용합니다. – iCodez

+0

Doh! 아마도 C/C++ 용어로 생각을 추가 할 때 그 주석을 추가했을 것입니다. 고마워, 그걸 고쳐. – Freezerburn

+0

동일한 zmqlib 버전을 사용하고 있습니까? pythons zmqlib는 pip에 의해 확장 기능으로 컴파일됩니까? – RickyA

답변

1

확실하지이 여기에 도움이되지만 것이다 간다 :

  • 당신은 send() 블록이 확실하고 간단하게 응답을 받고 결코의 문제가 아니다? send()ZMQ_NOBLOCK으로 호출하고 예외가 발생하는지 확인할 수 있습니다. 그렇다면 실제로 send()이 메시지를 큐에 넣지 못했습니다.
  • REQ/REP 대신 PAIR 소켓을 고려 했습니까? 스레드간에 통신을위한 The guide recommends this.
+1

zmq.DONTWAIT를 플래그로 사용하는 socket.send 명령을 사용하면 "Resource temporarily unavailable"이 콘솔에 인쇄됩니다. Python이 REP를 사용하고 C/C++에서 REQ를 사용하고 TEST 데이터를 보내기 전에 하나의 메시지를 수신하도록 비트를 약간 변경하면 "Connection refused"가 표시됩니다. C/C++ 측에서 EAGAIN과 EFSM 오류를 모두 보았습니다. – Freezerburn

+0

@Freezeburn. PAIR 소켓은? – bavaza

+0

불행히도, 그들은 아무것도하지 않은 것처럼 보입니다. REQ/REP 소켓을 사용하는 것과 똑같은 동작을 계속합니다. – Freezerburn

3

두 가지,

당신이 당신의 수정 된 버전에 필수/담당자를 사용 때문에 "... 보내, RECV을 보내 보내" 작동하지 않습니다. send/recv는 모두 'lock-step'방식으로 작동해야합니다 (send, recv, send, revc.)

ZMQ_NOBLOCK은 EAGAIN의 예외를 발생시킵니다. "소켓 연결 이 완료되지 않았습니다. 나중에 다시 오세요." 바인드 후 타이머/수면을 놓으십시오. send/recv. 이것이 "자원을 일시적으로 사용할 수 없음"msg의 원인입니다.

희망 하시겠습니까?

mr. onoffon