2012-07-06 3 views
2

UDP 메시지를 수집하고 1 초마다 처리하는 응용 프로그램을 작성 중입니다. 수신 메시지 목록을 변경하는 것이 위험하지 않고 : 나는 쉽게 (델 self.listener.messages [길이 0]) 목록에서 시작 값을 제거 할 수 있는지 확실하지 않다이 python 코드 스레드가 안전합니까 (꼬인 스레드)?

from twisted.internet.protocol import DatagramProtocol 
from twisted.internet import reactor 
import threading 
import time 

class UdpListener(DatagramProtocol): 

    messages = [] 

    def datagramReceived(self, data, (host, port)): 
     self.messages.append(data) 

class Messenger(threading.Thread): 

    listener = None 

    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     while True: 
      time.sleep(1) 
      recivedMessages = self.listener.messages 
      length = len(recivedMessages) 
      messagesToProccess = recivedMessages[0:length] 
      #doSomethingWithMessages(messagesToProccess) 
      del self.listener.messages[0:length] 
      print(length) 

listener = UdpListener() 

messenger = Messenger() 
messenger.listener = listener 
messenger.start() 

reactor.listenUDP(5556, listener) 
reactor.run() 

: 같은

응용 프로그램 프로토 타입 보인다 응용 프로그램이 충돌합니다.

업데이트 - 잠금 버전

class Messenger(threading.Thread): 

listener = None 
lock = threading.Lock() 

def __init__(self): 
    threading.Thread.__init__(self) 

def run(self): 
    while True: 
     time.sleep(1) 
     recivedMessages = self.listener.messages 
     self.lock.acquire() 
     try: 
      length = len(recivedMessages) 
      messagesToProccess = recivedMessages[0:length] 
      del self.listener.messages[0:length] 
     except Exception as e: 
      raise e 
     finally: 
      self.lock.release() 

     #doSomethingWithMessages(messagesToProccess) 
     print(length) 
+3

했다 추측 * 보통 * 당신이 경우 여기 callingLater과 트릭에 대한 –

답변

6

귀하의 코드는 스레드로부터 안전하지 않습니다, 아니. messages 주위에 자물쇠가 있어야합니다.

그러나 여기에는 스레드가 필요 없습니다. 왜 안되니?

from twisted.internet.protocol import DatagramProtocol 
from twisted.internet import reactor 

class UdpListener(DatagramProtocol): 
    callingLater = False 

    messages = [] 

    def process(self): 
     doSomethingWithMessages(self.messages) 
     self.messages = [] 
     self.callingLater = False 

    def datagramReceived(self, data, (host, port)): 
     self.messages.append(data) 
     if not self.callingLater: 
      reactor.callLater(1.0, self.process) 
      self.callingLater = True 

listener = UdpListener() 

reactor.listenUDP(5556, listener) 
reactor.run() 

UPDATE : 여기에 원래 버전은 교육 목적으로 만, 잠금 장치와 함께 작동 할 방법이다. 이것은 효율적이지 않고 버그가 발생하기 쉽지 않습니다. 편집 : 모든 메시지 로직을 UdpListener으로 분리하여 사용하는 클래스는 끈적 거리는 내부 세부 정보를 알 필요가 없습니다.

from twisted.internet.protocol import DatagramProtocol 
from twisted.internet import reactor 
import threading 
import time 

class UdpListener(DatagramProtocol): 
    message_lock = threading.Lock() 
    messages = [] 

    def datagramReceived(self, data, (host, port)): 
     with self.message_lock: 
      self.messages.append(data) 

    def getAndClearMessages(self): 
     with self.message_lock: 
      res = self.messages 
      self.messages = [] 
     return res 

class Messenger(threading.Thread): 

    listener = None 

    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     while True: 
      time.sleep(1) 
      recivedMessages = self.listener.getAndClearMessages() 
      length = len(recivedMessages) 
      #doSomethingWithMessages(recivedMessages) 
      print(length) 

listener = UdpListener() 

messenger = Messenger() 
messenger.listener = listener 
messenger.start() 

reactor.listenUDP(5556, listener) 
reactor.run() 
+0

감사하다 뭔가 잘못 방법을하고있는 의미 - I 그것을 사용할 것입니다. – mrok

+0

실력을 향상시키기 위해 "question"(우리 업데이트)에 잠금 기능이있는 버전을 추가했습니다. 지금이 더 나은지 말해 줄 수 있니? – mrok

+1

@mrok :'주위에 잠금 장치가 필요합니다.append (data)'그렇지 않으면 실제로 잠금을 수행하지 않습니다. 내 게시물을 자물쇠로 ('try/finally '대신에'with' 표기법으로) 보이는 것과 같이 업데이트했습니다. – Claudiu

0

이 목적을 위해 정확하게 구현 된 DeferredQueue로 구현하지 않는 이유는 무엇입니까? 쓰레드를 사용하고 싶다면 약간의주의가 필요합니다. 여기

스레딩을 허용하는 DeferredQueue와 예는 다음과 같습니다

class UdpListener(DatagramProtocol): 

    def __init__(self) 
     self._messages = DeferredQueue() 

    def datagramReceived(self, data, (host, port)): 
     self._messages.put(message) 

    @inlineCallbacks 
    def _wait_for_and_process_next_message(self): 

     # Get message from queue through a deferred call from the DeferredQueue 
     # Here we use @inlineCallbacks, so we assign the result from yield 
     # which is the new message, and will "block" (actually releasing control to Twisted) until a message gets in 
     message = yield self._message_queue.get() 

     # Do something with your message here, and ensure you catch any exceptions! 
     # If your message processing may be long, you may wish to run it in another thread, 
     # and because of @inlineCallbacks, this call will "block" until your function finishes. 
     # In case you did this, ensure you read the notes below. 
     yield threads.deferToThread(my_long_function, message) 

     # Schedule an immediate call to this method again in order to process next message 
     self.wait_for_and_process_next_message() 

    def wait_for_and_process_next_message(self): 
     reactor.callLater(0, self._wait_for_and_process_next_message) 

    def initialize(self): 
     # Call this during your application bootstrapping, so you start processing messages 
     self.wait_for_and_process_next_message() 

경우에 당신이 트위스트 스레드 풀에 메시지 처리를 연기하기 위해 선택하는 것이주의하는 것이 매우 중요 코드합니다 (threads.deferToThread 사용) 다른 스레드에서 실행 중이어야합니다. Twisted에서는 프로토콜이 스레드 안전 객체 (http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html#auto0)가 아닌 다른 스레드의 메시지에 응답 할 가능성이 높습니다.

def _send_message_critical_section(self, message): 
    self.transport.write(message, (self.host, self.port)) 

def send_message(self, message): 
    reactor.callFromThread(self._send_message_critical_section, message) 

기타 변경이 완료 :

  • _messagesmessages 변수 명칭 변경, 그것은 당신이 예에서와 같이 중요한 자원 transport을 보호하기 위해 reactor.callFromThread()을 사용하는 것이 경우에 대한

    전적으로 사적으로 간주되어야합니다.

  • __init__() 메서드 내에서 _messages 초기화를 이동하고 self._messages에 할당합니다. 그렇지 않으면 메시지 목록이 모든 인스턴스간에 공유됩니다! 나는 당신이 트위스트에 스레드를 시작하는 단 하나 개의 클래스의 인스턴스를하지만 ... (Variables inside and outside of a class __init__() function)
관련 문제