2014-01-15 2 views
3

작업을 배출하는 서버와이를 실행하는 여러 작업자가 서버/클라이언트 스크립트를 작성하려고합니다. 문제는 인공 호흡기에 많은 작업이있어 ​​심장 박동으로 기억을 채울 수 있다는 것입니다. 바인딩하기 전에 HWM을 설정하려고했지만 성공하지 못했습니다. 작업자가 연결 되 자마자 메시지를 계속 보내면서 설정 한 HWM을 완전히 무시합니다. 또한 수행 된 작업을 기록하는 싱크가 있습니다.ZeroMQ : PUSH의 HWM이 작동하지 않습니다.

server.py

import zmq 

def ventilate(): 
    context = zmq.Context() 

    # Socket to send messages on 
    sender = context.socket(zmq.PUSH) 
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue 
    sender.bind("tcp://*:5557") 


    # Socket with direct access to the sink: used to syncronize start of batch 
    sink = context.socket(zmq.PUSH) 
    sink.connect("tcp://localhost:5558") 

    print "Sending tasks to workers…" 

    # The first message is "0" and signals start of batch 
    sink.send('0') 
    print "Sent starting signal" 

    while True: 
     sender.send("Message") 



if __name__=="__main__": 
    ventilate() 

worker.py

import zmq 
from multiprocessing import Process 

def work(): 
    context = zmq.Context() 

    # Socket to receive messages on 
    receiver = context.socket(zmq.PULL) 
    receiver.connect("tcp://localhost:5557") 

    # Socket to send messages to 
    sender = context.socket(zmq.PUSH) 
    sender.connect("tcp://localhost:5558") 

    # Process t asks forever 
    while True: 
     msg = receiver.recv_msg() 
     print "Doing sth with msg %s"%(msg)  
     sender.send("Message %s done"%(msg)) 

if __name__ == "__main__": 
    for worker in range(10):   
     Process(target=work).start() 

sink.py

import zmq 

def sink(): 
    context = zmq.Context() 

    # Socket to receive messages on 
    receiver = context.socket(zmq.PULL) 
    receiver.bind("tcp://*:5558") 

    # Wait for start of batch 
    s = receiver.recv() 
    print "Received start signal" 
    while True: 
     msg = receiver.recv_msg() 
     print msg 


if __name__=="__main__": 
    sink() 
+0

문제를 재현 해 보겠습니다. 사용중인 PyZMQ 및 ZMQ의 버전을 알려주시겠습니까? 'zmq.zmq_version()'과'zmq .__ version__'을 실행하십시오. –

+0

ZMQ 버전은 4.0.3이고 pyzmq는 13.1.0 – Elvin

+0

입니다 .- 어, 그게 짜증나는 조합입니다. 당신은 pyzmq 14.0.1로 업데이트하고 그걸로 테스트 할 수 있습니까? (사용하는 zmq 버전이 마음에 듭니다. 그냥 알려주세요). 나는 zmq 3.x와 함께 pyzmq 13.1.0에있다.x를 Windows에서 실행하고 zmq 버전을 pyzmq v14로 업데이트하지 않고 변경하는 것이 좋지만 시도하기 전에 해당 버전의 문제점을 계속 확인하고 싶습니다. –

답변

5

좋아, 나는 연극 주위, 나는 문제가 생각하지 않습니다했다 PUSH HWM을 사용하지만 PULL을 위해 HWM을 설정할 수는 없습니다. this documentation을 보면 HWM에서 N/A라고 표시되는 것을 볼 수 있습니다.

PULL 소켓은 각각 수백 개의 메시지를 사용하고있는 것처럼 보입니다. PULL 소켓에 아무것도하지 않은 경우를 대비하여 HWM을 설정하려고 시도했습니다. 나는 증가하는 정수로 메시지를 보내도록 인공 호흡기를 변경하고 recv()에 대한 호출 사이에 2 초간 대기하도록 풀의 각 작업자를 변경하여이 사실을 입증했습니다. 작업자는 크게 다른 정수로 메시지를 처리하고 있음을 인쇄합니다. 예를 들어, 한 작업자가 메시지 10에서 작업하고 다음 작업은 메시지 400에서 작업합니다. 시간이 지남에 따라 메시지 10을 처리하는 작업자가 메시지 11, 12, 13 등을 처리하는 것을 볼 수 있습니다. 다른 것은 401, 402 등의 처리입니다.

이것은 ZMQ_PULL 소켓이 메시지를 어딘가에 버퍼링하고 있음을 나타냅니다. 따라서 ZMQ_PUSH 소켓에는 HWM이 있지만 PULL 소켓은 실제로 recv()을 호출하여 액세스하지는 않았지만 메시지를 빨리 요청합니다. 따라서 PULL 소켓이 연결된 경우 PUSH HWM이 효과적으로 무시됩니다. 지금까지 볼 수 있듯이 PULL 소켓의 버퍼 길이를 제어 할 수는 없습니다 (RCVHWM 소켓 옵션으로 제어 할 수 있지만 예상대로 표시되지는 않습니다).

이 동작은 ZMQ_PULL HWM 옵션의 요점을 묻습니다.이 옵션은 수신 소켓 HWM도 제어 할 수있는 경우에만 의미가 있습니다.

0MQ people이 무엇인지 확실하지 않거나 버그로 간주되는지 여부를 묻기 시작합니다.

죄송합니다. 더 도움이되지 못했습니다.

+0

지금까지 해 주신 노력에 감사드립니다. 나는 setsockopt (zmq.RCVBUF, 2) 설정이 실제로 느려지는 것을 알았다. 기본적으로이 값은 0으로 설정되어 운영 체제의 기본 버퍼 크기를 사용합니다. 그것이 무엇인지는 알 수 없습니다. 그것은 여전히 ​​내가 원하는 것을 정확히하지는 못하지만, 더 가까이에옵니다. – Elvin

1

ZeroMQ는 소켓의 송신 및 수신 양쪽 끝에 버퍼를 가지고 있으므로 코드에서 PUSH 및 PULL 소켓에 높은 워터 마크를 설정해야합니다 (실제로는 bind() 또는 connect() 앞에 와야합니다).

파이썬 바인딩에서 이것은 socket.hwm = 1을 통해 편리하게 수행되며 ZMQ_SNDHWMZMQ_RCVHWM을 한 번에 설정합니다.

관련 문제