2014-02-14 2 views
1

ZMQ를 사용하여 다중 게시자/다중 가입자 토폴로지를 구축하려고합니다. 약간의 수정을 통해 espresso.py 샘플을 사용하여 예제를 만들었습니다. 내가 zeromq에 대해 상당히 새롭기 때문에 내가하고있는 일이 옳은지 확인하고 싶었다. 자유롭게 비판하고 의견을 말하십시오.ZeroMQ 여러 게시자와 XPUB/XSUB를 사용하는 구독자가 올바른 구현입니까?

나는 기본적으로 몇 가지 교훈을 다뤘습니다.

  • zmq 소켓은 바인드 후()는 연결을 실행할 수 있습니다 즉들을 의미하지 않는다 단일 네트워크 카드 (일명 일반 소켓) 바인딩

  • 만 여러 프로세스에서 하나 개의 포트에 바인딩 할 수 있습니다 (소켓 개발자에게는 매우 혼란 스럽지만 이것은 소켓이 아닙니다.)

  • 프록시와 XPUB/XSUB는 구독자가 모든 게시자를 알아내어 연결할 필요가 없을 때 패턴으로 사용됩니다. 정말 아래 코드에 대한처럼 해달라고 무엇

각 가입자는 별도의 소켓에 결합 것입니다. 이것이 필요한 악이지만, 어쨌든 나는 이것이 올바르게 보이지 않는다고 생각하고 있었다.

여기 내 샘플 코드입니다.

# Espresso Pattern 
# This shows how to capture data using a pub-sub proxy 
# 

import time 

from random import randint 
from string import uppercase 
from threading import Thread 

import zmq 
from zmq.devices import monitored_queue 

from zhelpers import zpipe 

# The subscriber thread requests messages starting with 
# A and B, then reads and counts incoming messages. 


def subscriber_thread(): 
    ctx = zmq.Context.instance() 

    # Subscribe to "A" and "B" 
    subscriber = ctx.socket(zmq.SUB) 
    subscriber.connect("tcp://localhost:6001") 
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A") 
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B") 

    count = 0 
    while True: 
     try: 
      msg = subscriber.recv_multipart() 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 
      else: 
       raise 
     count += 1 

    print ("Subscriber received %d messages" % count) 


# .split publisher thread 
# The publisher sends random messages starting with A-J: 

def publisher_thread(port, char): 
    ctx = zmq.Context.instance() 

    publisher = ctx.socket(zmq.PUB) 
    publisher.bind("tcp://*:"+str(port)) 

    while True: 
     string = "%s-%05d" % (char, randint(port, port+500)) 
     try: 
      publisher.send(string) 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 
      else: 
       raise 
     time.sleep(0.1)   # Wait for 1/10th second 

# .split listener thread 
# The listener receives all messages flowing through the proxy, on its 
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects 
# attached child threads via inproc. In other languages your mileage may vary: 

def listener_thread(pipe): 

    # Print everything that arrives on pipe 
    while True: 
     try: 
      print (pipe.recv_multipart()) 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 


# .split main thread 
# The main task starts the subscriber and publisher, and then sets 
# itself up as a listening proxy. The listener runs as a child thread: 

def main(): 

    # Start child threads 
    ctx = zmq.Context.instance() 
    p_thread1 = Thread(target=publisher_thread, args=(6000,'A')) 
    p_thread2 = Thread(target=publisher_thread, args=(7000,'B')) 
    s_thread = Thread(target=subscriber_thread) 
    p_thread1.start() 
    p_thread2.start() 
    s_thread.start() 

    pipe = zpipe(ctx) 

    subscriber = ctx.socket(zmq.XSUB) 
    subscriber.connect("tcp://localhost:6000") 
    subscriber.connect("tcp://localhost:7000") 

    publisher = ctx.socket(zmq.XPUB) 
    publisher.bind("tcp://*:6001") 

    l_thread = Thread(target=listener_thread, args=(pipe[1],)) 
    l_thread.start() 

    try: 
     monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub') 
    except KeyboardInterrupt: 
     print ("Interrupted") 

    del subscriber, publisher, pipe 
    ctx.term() 

if __name__ == '__main__': 
    main() 
+1

이제는 혼란 스럽습니다. 위의 코드는 실제로 올바르지 않습니다. XPUB/XSUB 문서에 따르면 XPUB/XSUB 쪽 모두에서 bind() 여야하며 구독자와 게시자 모두 connect() (PDF에 연결된 코드 1 권, 48 페이지)를 사용해야합니다. 한 번 더 코드를 업로드 할 수 없지만 아이디어를 얻을 수 있습니다. – vivekv

답변

2

저는 ZeroMQ github 페이지에서 문제를 제기했으며 응답을 받았습니다. ZeroMQ의 알려진 버그는 구독 메시지의 수신자가 완전히 준비되기 전에 구독 요청을하는 다른 스레드에서 게시 및 구독이 발생하고 있기 때문에 발생합니다. 자세한 내용은 여기에서 찾을 수 있습니다.

https://github.com/zeromq/libzmq/issues/897

나는 같은 문제에 실수를 한단다 다른 사람을 위해 모든 것을 공유 여기에 문제

https://gist.github.com/vivekfantain/9021979

을 시뮬레이션하기 위해 노력했다.

+0

한 가지 더 중요한 점은 XPUB/XSUB가 ZMQ 2.xx 버전에서 잘못되었습니다. 나는 그것을 4.x 버전에서만 작동하도록 만들었다. – vivekv

관련 문제