2014-11-07 2 views
1

일부 모니터링 도구를 작성 중이며 이러한 도구 중 하나는 NFS 마운트 파일 시스템 목록을 처리하고 각 공유에 테스트 파일을 쓰려고 시도합니다. 나는 테스트중인 각 파일러에 대한 스레드를 생성합니다. 특정 시간 초과 후 여전히 실행중인 모든 스레드를 종료하고 싶습니다. 스레드에서 출력을 수집 할 필요가 없으며 모든 스레드에 조인하거나 연결할 필요가 없습니다. 시간 초과 후에도 실행중인 스레드를 중지해야합니다.파이썬 스레딩/서브 프로세스; 하위 프로세스가 여전히 실행 중일 때 스레드 개체가 유효하지 않습니다.

의도적으로 각 스레드에서 sleep (300)을 실행하므로 모든 활성 스레드를 반복하고 죽이려고 시도 할 때 각 스레드에서 생성 된 프로세스가 아직 끝나지 않은 상태인지 알 수 있습니다. 이 방법은 5 ~ 20 개 스레드에서 작동하지만 self.pid가 더 이상 유효하지 않다는 스레드에서 결국 실패합니다. 이 스레드는 임의적이며 마지막 실행과 반드시 ​​동일한 스레드 일 필요는 없습니다.

2014-11-10 09:00:22 CST there are 173 NFS write test threads active after the timeout: 
2014-11-10 09:00:22 CST theadname=Thread-165 
2014-11-10 09:00:22 CST thread 'hostname1' (thread 165) is still alive 
2014-11-10 09:00:22 CST killing thread for 'hostname1' (pid=XX) with SIGTERM 
2014-11-10 09:00:22 CST theadname=Thread-97 
2014-11-10 09:00:22 CST thread 'hostname2' (thread 97) is still alive 
2014-11-10 09:00:22 CST  NFS write test for 'hostname1' completed in 60 seconds 
2014-11-10 09:00:22 CST killing thread for 'hostname2' (pid=XX) with SIGTERM 
2014-11-10 09:00:22 CST theadname=Thread-66 
2014-11-10 09:00:22 CST thread 'hostname3' (thread 66) is still alive 
2014-11-10 09:00:22 CST  NFS write test for 'hostname2' completed in 60 seconds 
2014-11-10 09:00:22 CST killing thread for 'hostname3' (pid=XX) with SIGTERM 
2014-11-10 09:00:22 CST theadname=Thread-121 
2014-11-10 09:00:22 CST thread 'hostname4' (thread 121) is still alive 
2014-11-10 09:00:22 CST killing thread for 'hostname4' (pid=XX) with SIGTERM 
Traceback (most recent call last): 
2014-11-10 09:00:22 CST  NFS write test for 'hostname3' completed in 60 seconds 
    File "./NFSWriteTestCheck.py", line 199, in <module> 
    thr.kill() 
    File "./NFSWriteTestCheck.py", line 84, in kill 
    os.kill(self.process.pid, signal.SIGKILL) 
AttributeError: 'NoneType' object has no attribute 

이 오류는 프로세스가 계속 쉘에서 PS 확인, 실행 표시되는 시점에서 :

class NFSWriteTestThread(threading.Thread): 

    def __init__(self, filer, base_mnt_point, number): 
     super(NFSWriteTestThread, self).__init__() 
     self.tname = filer 
     self.tnum  = number 
     self.filer = filer 
     self.mntpt = base_mnt_point 
     self.process = None 
     self.pid  = None 

    def run(self): 
     start = time.time() 
#  self.process = subprocess.Popen(['/bin/dd', 'if=/dev/zero', 'bs=1M', 'count=5', 'of=' + self.testfile], shell=False) 
     self.process = subprocess.Popen(['/bin/sleep', '300'], shell=False) 
     time.sleep(1) 
     logger.debug("DEBUG: %s=%d" % (self.tname, self.process.pid)) 
     self.pid  = self.process.pid 
     logger.info(" NFS write test command initiaited on '%s', pid=%d" % (self.filer, self.pid)) 
     self.process.wait() 
#  self.output, self.error = self.process.communicate() 
     end = time.time() 
     logger.info(" NFS write test for '%s' completed in %d seconds" % (self.filer, end - start)) 
     return 

    def getThreadName(self): 
     return self.tname 

    def getThreadNum(self): 
     return self.tnum 

    def getThreadPID(self): 
     if self.pid: 
      return self.pid 
     else: 
      return "unknown" 

    def isAlive(self): 
     if not self.process: 
      logger.debug("Error: self.process is invalid (%s)" % type(self.process)) 
#  if self.process.poll(): 
#   logger.info("NFS write test operation for thread '%s' is still active" % self.filer) 
#  else: 
#   logger.info("NFS write test operation for thread '%s' is inactive" % self.filer) 
     return 

    def terminate(self): 
     os.kill(self.process.pid, signal.SIGTERM) 
     return 

    def kill(self): 
     os.kill(self.process.pid, signal.SIGKILL) 
     return 

def initLogging(config): 

    logfile = os.path.join(config['logdir'], config['logfilename']) 
    fformat = logging.Formatter('%(asctime)s %(message)s', "%Y-%m-%d %H:%M:%S %Z") 
    cformat = logging.Formatter('%(asctime)s %(message)s', "%Y-%m-%d %H:%M:%S %Z") 
    clogger = None 
    flogger = None 

    if config['debug']: 
     loglevel = logging.DEBUG 

    if not os.path.exists(config['logdir']): 
     os.makedirs(config['logdir']) 
     os.chmod(config['logdir'], 0700) 
     os.chown(config['logdir'], 0, 0) 

    try: 
     logger = logging.getLogger('main') 
     logger.setLevel(logging.DEBUG) 

     # Define a file logger 
     flogger = logging.FileHandler(logfile, 'w') 
     flogger.setLevel(logging.DEBUG) 
     flogger.setFormatter(fformat) 
     logger.addHandler(flogger) 

     # Define a console logger if verbose 
     if config['verbose']: 
      clogger = logging.StreamHandler() 
      clogger.setLevel(logging.DEBUG) 
      clogger.setFormatter(cformat) 
      logger.addHandler(clogger) 
    except Exception, error: 
     print "Error: Unable to initialize file logging: %s" % error 
     sys.exit(1) 

    logger.info("Script initiated.") 

    logger.info("Using the following configuration:") 
    for key, value in sorted(config.iteritems()): 
     logger.info(" %20s = '%-s'" % (key, value)) 

    return logger 

def parseConfigFile(cfg): 

    if not os.path.isfile(cfg['cfgfile']) or not os.access(cfg['cfgfile'], os.R_OK): 
     print "Error: '%s' does not exist or is not readable, terminating." % cfg['cfgfile'] 
     sys.exit(1) 

    config = SafeConfigParser() 
    config.read(cfg['cfgfile']) 

    _cfg = dict(config.items(cfg['cfgfilestanza'])) 

    _cfgfilers = config.get(cfg['cfgfilestanza'], 'managed_filers') 
    _tmpfilers = _cfgfilers.split(',') 

    # populate a list containing all filers which will be meged into the global cfg[] dict 
    _cfg['filers'] = [] 

    for _f in _tmpfilers: 
     _cfg['filers'].append(_f.strip()) 

    return _cfg 


logger = initLogging(cfg) 
cfg.update(parseConfigFile(cfg)) 

threads  = [] 
numThreads = 0 

for filer in cfg['filers']: 
    numThreads += 1 
    logger.debug(" spawning NFS wite test thread for '%s', thread number %s" % (filer, numThreads)) 
    t = NFSWriteTestThread(filer, cfg['base_mnt_point'], numThreads) 
    t.start() 
    threads.append(t) 
# time.sleep(1) 

logger.info("spawned %d NFS write test child threads" % numThreads) 

logger.info("sleeping for %d seconds" % cfg['timeout']) 
time.sleep(cfg['timeout']) 

if (threading.activeCount() > 1): 
    logger.info("there are %d NFS write test threads active after the timeout:" % (threading.activeCount() - 1)) 
    for thr in threading.enumerate(): 
     logger.debug("theadname=%s" % thr.name) 
     if re.match("MainThread", thr.getName()): 
      pass 
     else: 
      logger.info("thread '%s' (thread %d) is still alive" % (thr.getThreadName(), thr.getThreadNum())) 
#   thr.isAlive() 
      logger.info("killing thread for '%s' (pid=XX) with SIGTERM" % (thr.getThreadName())) 
#   logger.info("killing thread for '%s' (pid=%d) with SIGTERM" % (thr.getThreadName(), thr.getThreadPID())) 
      thr.kill() 


logger.info("Script complete") 
sys.exit(0) 

여기에서 출력을 볼 수 있습니다. 스레드 객체가 더 이상 유효하지 않은 이유는 무엇입니까? 이 오류가 발생 된 시점에서, 스레드 실행이 시점에서해야한다 :

self.process.wait()

그냥 내가 버그 또는 뭔가를 치는거야 궁금 여기 내 머리를 긁적.

답변

1

스레드를 중지 할 수 없으며 수행중인 작업 만 방해 할 수 있습니다. 귀하의 경우 스레드가 subprocess.call을 기다리고 있습니다. 스레드가 이벤트를 기다리지 않으므로 이벤트 설정은 아무 효과가 없습니다. 여기의 해결책은 Popen 객체가 필요하다는 것을 의미하는 자식 프로세스를 죽이는 것입니다.

Popen 개체를 편리하게 사용할 수 있도록 구현 메서드를 직접 실행 메서드에 넣습니다.

class NFSWriteTestThread(threading.Thread): 

    def __init__(self, filer, mntpoint, number): 
     super(NFSWriteTestThread, self).__init__() 
     self.name = filer 
     self.filer = filer 
     self.mntpt = mntpoint 
     self.tnum = number 
     self._proc = None 

    def run(self): 
     testfile = "%s/%s/test/test.%s" % (mountpoint, filer, filer) 
     testcmd = "/bin/bash -c '/bin/dd if=/dev/zero bs=1024 count=1024 of=" + testfile + " >/dev/null 2>/dev/null; sleep 120'" 
     self._proc = subprocess.Popen(testcmd, shell=True) 
     self._proc.wait() 
     return 

    def getName(self): 
     return self.name 

    def kill(self): 
     if self._proc: 
      self._proc.terminate() 

    def stopped(self): 
     if self._proc: 
      return self._proc.poll() is not None 
     else: 
      return True 
+1

나는이 접근법을 한 가지 점으로 생각하면서 "AttributeError : 'Popen'객체에 'terminate'속성이 없다는 불만을 계속 표시했습니다. 내 원래 코드를 업데이트하여 현재 버전을 표시합니다. – BenH

+0

'terminate'는 파이썬 2.6에서 새로 추가되었습니다. 또한 2.7이어야합니다. 예를 들어 redhat 5를 사용하는 경우 기본 파이썬은 2.4이고 해당되지 않습니다. 'terminate '는 리눅스와 윈도우가 똑같이 작동하도록하는 방법 일 뿐이다.'신호를 가져올 수있다','self._proc.send_signal (signal.SIGTERM)' – tdelaney

+1

나는 이것으로 시간을 보내고있다. 나는 종결()와 kill()을 포기하고 지금은 os.kill을 사용하고있다 .. 그러나 process.pid를 평가할 때 일관성없는 결과를 얻는다. 프로세스가 여전히 100 % 실행 중일 때, ps와 스레드가 현재 sleep (300)을 실행하고 있다는 사실을 확인했지만, 프로세스가 여전히 실행 중이더라도 self.pid는 언젠가 설정이 해제됩니다. – BenH

관련 문제