ZMQ를 사용하여 다중 게시자/다중 가입자 토폴로지를 구축하려고합니다. 약간의 수정을 통해 espresso.py 샘플을 사용하여 예제를 만들었습니다. 내가 zeromq에 대해 상당히 새롭기 때문에 내가하고있는 일이 옳은지 확인하고 싶었다. 자유롭게 비판하고 의견을 말하십시오.ZeroMQ 여러 게시자와 XPUB/XSUB를 사용하는 구독자가 올바른 구현입니까?
나는 기본적으로 몇 가지 교훈을 다뤘습니다.
zmq 소켓은 바인드 후()는 연결을 실행할 수 있습니다 즉들을 의미하지 않는다 단일 네트워크 카드 (일명 일반 소켓) 바인딩
만 여러 프로세스에서 하나 개의 포트에 바인딩 할 수 있습니다 (소켓 개발자에게는 매우 혼란 스럽지만 이것은 소켓이 아닙니다.)
프록시와 XPUB/XSUB는 구독자가 모든 게시자를 알아내어 연결할 필요가 없을 때 패턴으로 사용됩니다. 정말 아래 코드에 대한처럼 해달라고 무엇
각 가입자는 별도의 소켓에 결합 것입니다. 이것이 필요한 악이지만, 어쨌든 나는 이것이 올바르게 보이지 않는다고 생각하고 있었다.
여기 내 샘플 코드입니다.
# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#
import time
from random import randint
from string import uppercase
from threading import Thread
import zmq
from zmq.devices import monitored_queue
from zhelpers import zpipe
# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.
def subscriber_thread():
ctx = zmq.Context.instance()
# Subscribe to "A" and "B"
subscriber = ctx.socket(zmq.SUB)
subscriber.connect("tcp://localhost:6001")
subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
subscriber.setsockopt(zmq.SUBSCRIBE, b"B")
count = 0
while True:
try:
msg = subscriber.recv_multipart()
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
else:
raise
count += 1
print ("Subscriber received %d messages" % count)
# .split publisher thread
# The publisher sends random messages starting with A-J:
def publisher_thread(port, char):
ctx = zmq.Context.instance()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:"+str(port))
while True:
string = "%s-%05d" % (char, randint(port, port+500))
try:
publisher.send(string)
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
else:
raise
time.sleep(0.1) # Wait for 1/10th second
# .split listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:
def listener_thread(pipe):
# Print everything that arrives on pipe
while True:
try:
print (pipe.recv_multipart())
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
# .split main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:
def main():
# Start child threads
ctx = zmq.Context.instance()
p_thread1 = Thread(target=publisher_thread, args=(6000,'A'))
p_thread2 = Thread(target=publisher_thread, args=(7000,'B'))
s_thread = Thread(target=subscriber_thread)
p_thread1.start()
p_thread2.start()
s_thread.start()
pipe = zpipe(ctx)
subscriber = ctx.socket(zmq.XSUB)
subscriber.connect("tcp://localhost:6000")
subscriber.connect("tcp://localhost:7000")
publisher = ctx.socket(zmq.XPUB)
publisher.bind("tcp://*:6001")
l_thread = Thread(target=listener_thread, args=(pipe[1],))
l_thread.start()
try:
monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub')
except KeyboardInterrupt:
print ("Interrupted")
del subscriber, publisher, pipe
ctx.term()
if __name__ == '__main__':
main()
이제는 혼란 스럽습니다. 위의 코드는 실제로 올바르지 않습니다. XPUB/XSUB 문서에 따르면 XPUB/XSUB 쪽 모두에서 bind() 여야하며 구독자와 게시자 모두 connect() (PDF에 연결된 코드 1 권, 48 페이지)를 사용해야합니다. 한 번 더 코드를 업로드 할 수 없지만 아이디어를 얻을 수 있습니다. – vivekv