2012-05-25 3 views
2

두 개의 스레드 (생산자와 소비자)가 있고 Queue과 데이터를 공유합니다. 문제는 내가 생산자를 강제로 중단 할 때 가끔 소비자가 잠기는 것입니다.파이썬 스레딩 교착 상태

문서에서 큐가있는 스레드를 취소하면 큐가 손상되어 교착 상태가 발생할 수 있습니다. 명시 적으로 잠금을 얻지는 않지만 Queue.py의 소스를 읽는 것은 putget이 그렇게하고 있다고 말합니다.

내가 스레드를 중단하면 get/put 중간에있을 수 있습니다. 즉 잠금을 사용하고 해제하지 않았습니까? 그것에 대해 무엇을 할 수 있습니까? 가끔 프로듀서를 조기에 종료해야 할 때도 있습니다. 스레드 대신 프로세스를 사용하면 어떤 차이가 있습니까?

+3

어떻게 당신이 스레드를 중단합니까? Ctrl + C? –

+1

이렇게 : http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python –

+0

우리는 무슨 일이 일어나고 있는지 알 수 있도록 몇 가지 코드를 게시 할 수 있습니까? – betabandido

답변

0

대부분의 경우 교착 상태는 완료되지 않은 스레드 때문입니다. 당신이 리눅스는 역 추적을 인쇄 할 pyrasite에서 인젝터를 사용할 수있는 경우 당신이 당신의 신호 처리기에 락 (lock)을 사용하는 경우

을 (당신이 프로그램이 걸려 위치를 알 것) - 그럼 아마 당신의 교착 상태 (즉, 조금 복잡하다,

스레드 대신 프로세스를 작성하면 분명히 상황이 바뀌지 만 데이터 교환 및 동기화는 매우 복잡하다는 것을 기억하십시오.

+0

스레드를 "완료"하는 방법은 무엇입니까? 나는 run()이 끝나면 끝났다고 생각했다. 그리고 나는 어떤 시그널 핸들러도 직접 사용하지 않는다. 아마 Queue.py가 할 것인가? http://hg.python.org/cpython/file/2.7/Lib/Queue.py –

+0

예, run()이 끝나면 끝났습니다 (우아함을 위해 주 스레드는 "join()"을해야합니다) -> 디버그 정보를 추가하여 queue.get() 또는 심지어 queue.put() (!)에 걸릴 수 있으므로 스레드 (제작자/기록)가 종료되었는지 확인하십시오. – ddzialak

+0

이 "hanging"은 정확히 무슨 일이 일어나고 있는지 생각합니다. 문제는 하나의 스레드를 중단하여 끝까지 도달하지 못하게하는 것입니다. 어떻게해야합니까? 내 말은, 내가 그걸 중단해야하지만 큐가 멈춘 것 같아 ..? –

0

어쩌면이 도움이 될 것입니다

import threading 

class MyQueue: 
    def __init__(self): 
     self.tasks = [] 
     self.tlock = threading.Semaphore(0) 
     self.dlock = threading.Lock() 
     self.aborted = False 

    def put(self, arg): 
     try: 
      self.dlock.acquire() 
      self.tasks.append(arg) 
     finally: 
      self.dlock.release() 
      self.tlock.release() 

    def get(self): 
     if self.aborted: 
      return None 
     self.tlock.acquire() 
     if self.aborted: 
      self.tlock.release() 
      return None 
     try: 
      self.dlock.acquire() 
      if self.tasks: 
       return self.tasks.pop() 
      else: # executed abort 
       return None 
     finally: 
      self.dlock.release() 

    def abort(self): 
     self.aborted = True 
     self.tlock.release() 

# TESTING 

mq = MyQueue() 
import sys 

def tlog(line): 
    sys.stdout.write("[ %s ] %s\n" % (threading.currentThread().name, line)) 
    sys.stdout.flush() 

def reader(): 
    arg = 1 
    while arg is not None: 
     tlog("start reading") 
     arg = mq.get() 
     tlog("read: %s" % arg) 
    tlog("END") 

import time, random 
def writer(): 
    try: 
     pos = 1 
     while not mq.aborted: 
      x = random.random() * 5 
      tlog("writer sleep (%s)" % x) 
      pending = x 
      while pending > 0: 
       tosleep = min(0.5, pending) 
       if mq.aborted: 
        return 
       time.sleep(tosleep) 
       pending -= tosleep 

      tlog("write: %s" % x) 
      mq.put("POS %s val=%s" % (pos, x)) 
      pos += 1 
    finally: 
     tlog("writer END") 

def testStart(): 
    try: 
     for i in xrange(9): 
      th = threading.Thread(None, reader, "reader %s" % i,(), {}, None) 
      th.start() 
     for i in xrange(3): 
      th = threading.Thread(None, writer, "writer %s" % i,(), {}, None) 
      th.start() 
     time.sleep(30) # seconds for testing 
    finally: 
     print "main thread: abort()" 
     mq.abort() 

if __name__ == "__main__": 
    testStart() 
+0

대단히 감사합니다.하지만 스레드를 중단해야합니다. 위와 같이 작업을 완료하는 데 너무 오래 걸릴 수 있습니다. –