나는 '호출 가능'메소드를 구현하는 threading.Thread
서브 클래스를 작성하고 있습니다.이 코드를 개선하는 방법
일반적으로 스레드의 임의의 메서드를 사용하는 메서드는 스레드가 호출하는 모든 스레드에서 실행됩니다. 이 메서드는 호출 된 스레드에서 실행되고 호출 스레드에서 추가 실행을 차단하거나 메서드를 만드는 데 사용되는 데코레이터에 따라 결과를 가져 오는 수단을 반환합니다. 코드는 다음과 같습니다.
import threading
import collections
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
#_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method
self._enqueued_calls = collections.deque()
# _enqueue_call_permission is for callers to signal that they have placed something on the queue
self._enqueue_call_permission = threading.Condition()
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator function to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks until the decorated function
# is called and returns. It then returns the value unmodified. The code in register runs
# in the calling thread and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete))
self._enqueue_call_permission.notify()
with call_complete:
if not response:
call_complete.wait()
return response.popleft()
return register
@staticmethod
def nonblocking_method(f):
u"""A decorator function to implement a non-blocking method on a thread"""
# the returned function enqueues the decorated function and returns a tuple consisting of a condition
# to wait for and a deque to read the result out of. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), None, None))
self._enqueue_call_permission.notify()
return call_complete, response
return register
def run(self):
self._run = True
while self._run: # while we've not been killed
with self._enqueue_call_permission: # get the condition so we can wait on it.
if not self._enqueued_calls:
self._enqueue_call_permission.wait() # wait if we need to
while self._enqueued_calls:
((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft()
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
def stop(self):
u""" Signal the thread to stop"""
self._run = False
if __name__=='__main__':
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.blocking_method
def increment(self, tag):
print "{0} FROM: {1}".format(self.counter, tag)
self.counter += 1
class ThreadClient(threading.Thread):
def __init__(self, callable_thread, tag):
self.callable_thread = callable_thread
self.tag = tag
super(ThreadClient, self).__init__()
def run(self):
for i in range(0, 4):
self.callable_thread.increment(self.tag)
t = TestThread()
t.start()
clients = [ThreadClient(t, i) for i in range(0, 10)]
for client in clients:
client.start()
## client.join()
for client in clients:
client.join()
t.stop()
분명히 볼 수 있듯이 정적 메소드를 데코레이터로 사용하고 있습니다. 데코레이터는 적용된 메서드를 사용하여 데코 레이팅 된 함수를 호출 된 인수와 함께 완료하는 threading.Condition 인스턴스와 완료시 notify
의 인스턴스와 collections.deque
인스턴스를 반환합니다.
제안 사항? 특히 이름 지정, 아키텍처 포인트 및 견고성에 관심이 있습니다.
편집 : 인터프리터에서 멀리있는 동안 제안 사항을 기반으로 변경 한 일부 내용이 코드를 위반하여 방금 수정했습니다.
_enqueue_call_permission과 관련된 오타가 있다고 생각합니다 (def 실행시) – cpf