그래서 저는 Python의 ZMQ와 C/C++ 확장의 ZMQ 사이의 간단한 통신을 설정하려고합니다. 파이썬은 컨텍스트를 설정하고, inproc 소켓을 바인드하며, 컨텍스트와 소켓 이름을 확장에 전달합니다. 확장 프로그램은 자체 소켓을 설정하고, 연결하고, 메시지를 수신합니다. 그런 다음 Python은 머리말을 보낸 다음 사전의 문자열 표현을 확장자로 보냅니다. REQ/REP 소켓을 사용하는 아주 간단한 것들. 그러나 찾을 수없는 몇 가지 이유로 socket.send에 대한 호출이 차단되고 확장이 zmq_recv에 대한 호출을 지나칠 수 없습니다. 나는 거의 동일한 시나리오가 적용되는 테스트 환경을 가지고 있지만 소켓은 차단되지 않고 코드를 세 번 검사했고 같은 방식으로 작동해야합니다.ZMQ : socket_send/recv blocking
PYTHON :
import zmq
import cppextension
# No lectures about using threading please. I'm restricted to, in essence
# using this function because of the code base I'm working with.
from thread import start_new_thread
socket = self.zmq_context.socket(zmq.REQ)
socket_name = "inproc://agl"
socket.bind(socket_name)
t = start_native_thread(cppextension.actor,
(self.zmq_context, socket_name))
test_send = {"foo": 1, "bar": 2}
# BLOCKS ON THIS LINE VVVVV
socket.send("TEST", flags=zmq.SNDMORE)
socket.send(str(test_send))
socket.recv()
socket.send("STOP")
C/C++ :
// Originally these used std::basic_string<Py_UNICODE> but I reverted
// back to normal std::string so I can use a JSON parsing library.
typedef string pystring;
typedef char pystring_t;
extern "C" PyObject *
actor(PyObject *self, PyObject *args) {
PyObject *py_context, *py_connect_to;
PyThreadState *_save;
void *context;
char *connect_to;
void *socket;
int rc;
if(!PyArg_ParseTuple(args, "OO", &py_context, &py_connect_to)) {
PyErr_SetString(PyExc_TypeError, "Expected two arguments (ZMQ context, name of socket to connect to)");
return NULL;
}
py_context = PyObject_GetAttrString(py_context, "_handle");
if(py_context == NULL) {
PyErr_SetString(PyExc_TypeError, "Could not get '_handle' from context");
return NULL;
}
if(!PyInt_Check(py_context)) {
PyErr_SetString(PyExc_TypeError, "_handle was not an integer");
return NULL;
}
context = (void*)PyInt_AsLong(py_context);
connect_to = new char[PyString_Size(py_connect_to) + 1];
strcpy(connect_to, PyString_AsString(py_connect_to));
_save = PyEval_SaveThread();
//
// GIL-less operation BEGIN
// ** WARNING: Do NOT call any functions that begin with 'Py', or touch any
// data structures that begin with 'Py' while in this section. It *WILL*
// blow up the Python interpreter.
//
socket = zmq_socket(context, ZMQ_REP);
rc = zmq_connect(socket, connect_to);
pystring TEST("TEST");
pystring STOP("STOP");
pystring SUCCESS("SUCCESS");
pystring FAILURE("FAILURE");
if(rc == 0) {
int going = 1;
// Should be able to hold a full megabyte of text, which should be enough
// for any message being passed in.
// Is there a way to query size of the incoming message...?
char buffer[1000000];
while(going) {
// BLOCKS ON THIS LINE VVVVVV
int size = zmq_recv(socket, buffer, 1000000, 0);
if(size == -1) {
// ERROR
continue;
}
// Assume we don't get larger than 1MB of data. Should put a
// check around this at some point, but not right now.
buffer[size] = 0;
pystring fullmsg(buffer);
cout << "ZMQ RECIEVED: " << fullmsg << endl;
if(fullmsg == TEST) {
size = zmq_recv(socket, &buffer, 1000000, 0);
if(size != -1) {
buffer[size] = 0;
pystring json_fullmsg(buffer);
cout << "ZMQ JSON: " << json_fullmsg << endl;
contacts.add(json_fullmsg);
zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
}
else {
zmq_send(socket, FAILURE.c_str(), FAILURE.size() + 1, 0);
}
}
else if(fullmsg == STOP) {
going = 0;
zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
}
}
}
else {
// ERROR
int err = zmq_errno();
switch(err) {
case EINVAL:
cout << "ZMQ CONNECT ERR: " << "Endpoint supplied is invalid" << endl;
break;
default:
cout << "ZMQ CONNECT ERR: " << err << endl;
break;
}
}
zmq_close(socket);
//
// GIL-less operation END
//
PyEval_RestoreThread(_save);
Py_INCREF(Py_None);
return Py_None;
}
매우 감사합니다 여기에서 무슨 일을 알아내는 어떤 도움.
EDIT :이 코드는 gevent가 표준 라이브러리를 monkeypatched 한 환경에서 실행됩니다. 이것은 monkeypatching이 발생하기 전에 저장 되었기 때문에 thread.start_new_thread를 사용하는 이유 중 일부입니다. 그리고 실제 스레드가 녹색 스레드가 아닌 것을 원합니다.
문제가 해결되지는 않지만 Python은 "//"이 아닌 주석에 "#"을 사용합니다. – iCodez
Doh! 아마도 C/C++ 용어로 생각을 추가 할 때 그 주석을 추가했을 것입니다. 고마워, 그걸 고쳐. – Freezerburn
동일한 zmqlib 버전을 사용하고 있습니까? pythons zmqlib는 pip에 의해 확장 기능으로 컴파일됩니까? – RickyA