작업을 배출하는 서버와이를 실행하는 여러 작업자가 서버/클라이언트 스크립트를 작성하려고합니다. 문제는 인공 호흡기에 많은 작업이있어 심장 박동으로 기억을 채울 수 있다는 것입니다. 바인딩하기 전에 HWM을 설정하려고했지만 성공하지 못했습니다. 작업자가 연결 되 자마자 메시지를 계속 보내면서 설정 한 HWM을 완전히 무시합니다. 또한 수행 된 작업을 기록하는 싱크가 있습니다.ZeroMQ : PUSH의 HWM이 작동하지 않습니다.
server.py
import zmq
def ventilate():
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print "Sending tasks to workers…"
# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"
while True:
sender.send("Message")
if __name__=="__main__":
ventilate()
worker.py
import zmq
from multiprocessing import Process
def work():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))
if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()
sink.py
import zmq
def sink():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg
if __name__=="__main__":
sink()
문제를 재현 해 보겠습니다. 사용중인 PyZMQ 및 ZMQ의 버전을 알려주시겠습니까? 'zmq.zmq_version()'과'zmq .__ version__'을 실행하십시오. –
ZMQ 버전은 4.0.3이고 pyzmq는 13.1.0 – Elvin
입니다 .- 어, 그게 짜증나는 조합입니다. 당신은 pyzmq 14.0.1로 업데이트하고 그걸로 테스트 할 수 있습니까? (사용하는 zmq 버전이 마음에 듭니다. 그냥 알려주세요). 나는 zmq 3.x와 함께 pyzmq 13.1.0에있다.x를 Windows에서 실행하고 zmq 버전을 pyzmq v14로 업데이트하지 않고 변경하는 것이 좋지만 시도하기 전에 해당 버전의 문제점을 계속 확인하고 싶습니다. –