2009-07-15 2 views
34

나는 생성기 함수를 생성 한 다음 그것을 새로운 쓰레드로 전달하는 다중 쓰레드 프로그램을 가지고있다. 내가 공유하기를 원하기 때문에 자연스럽게 전역 적으로 각 쓰레드는 생성자로부터 다음 값을 얻을 수있다.Generators Threadsafe입니까?

이와 같은 생성기를 사용하는 것이 안전합니까? 아니면 여러 스레드에서 공유 생성기에 액세스하는 문제/조건이 실행됩니까?

그렇지 않은 경우 문제에 접근하는 더 좋은 방법이 있습니까? 목록을 통해 순환하고 스레드가 호출하는 다음 값을 생성하는 무언가가 필요합니다.

답변

49

스레드로부터 안전하지 않습니다. 동시 호출이 인터리브 될 수 있으며 지역 변수를 혼란시킬 수 있습니다.

일반적인 접근 방식은 마스터 - 슬레이브 패턴 (현재는 PC에서 farmer-worker 패턴이라고 함)을 사용하는 것입니다. 데이터를 생성하는 세 번째 스레드를 만들고 마스터와 슬레이브 사이에 큐를 추가합니다. 여기서 슬레이브는 큐에서 읽을 것이고 마스터는 스레드에 쓸 것입니다. 표준 큐 모듈은 필요한 스레드 안전성을 제공하고 슬레이브가 더 많은 데이터를 읽을 준비가 될 때까지 마스터를 차단합니다.

+7

확실히 +1합니다. 적용 가능한 경우 스레딩 시스템을 구성하는 훌륭한 방법입니다. (대부분이 시간이며 확실히이 작업입니다). –

-7

: 당신은 발전기 및 멀티 스레딩에 대한 흥미로운 정보를 찾을 수 있습니다. CPython에서 GIL은 특정 시점에 하나의 스레드 만 코드를 실행할 수 있기 때문에 파이썬 객체에 대한 모든 작업을 스레드 안전으로 만듭니다.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

"GIL은 파이썬 객체의 모든 작업을 스레드 세이프로 만듭니다"- 응? 모든 작업은 원자가가 아닙니다. –

+6

이것은 오도 된 일입니다. GIL은 다중 스레드 환경에서 파이썬 코드가 파이썬 상태를 손상시키지 않는다는 것을 의미합니다. 바이트 코드 op 중간에서 스레드를 변경할 수는 없습니다. 예를 들어 공유 dict를 손상시키지 않고 수정할 수 있습니다. 두 바이트 코드 연산간에 스레드를 변경할 수 있습니다. –

40

편집 아래 벤치 마크를 추가 할 수 있습니다.

발전기를 자물쇠로 감쌀 수 있습니다. 예를 들어,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

잠금 내 시스템에 50ms가 소요 대기열이 350ms 소요됩니다. 대기열은 실제로 대기열이있을 때 유용합니다. 예를 들어 들어오는 HTTP 요청이 있고 작업자 스레드에서 처리하도록 대기열에 넣으려는 경우입니다. (이것은 파이썬 반복자 모델에 맞지 않습니다. 일단 반복자가 아이템을 다 쓰면, 끝난 것입니다.) 실제로 반복자가 있다면, LockedIterator는 스레드 안전성을 높이는 더 빠르고 간단한 방법입니다.

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000) 
+1

Queue.Queue를 사용하면 효율성이 떨어지지 만 아름답게 수행됩니다. – gooli

관련 문제