2012-02-11 5 views
0

실행하지 않는 스레드에 문제가 있습니다. 나는 스레드를 설정하는 방법에 관한 문서를 따라 갔고 첫 번째 일괄 처리를 마친 후에는 스레드가 작동하는 것을 제외하고는 훌륭하게 작동합니다. 기본적으로 원격 서버 로그 디렉토리를 로컬에 마운트 한 다음 두 개의 특정 문자열에 대한 로그를 구문 분석합니다. 처음 10 개의 스레드가 끝나면 서버가 멈추고 결코 넘어지지 않습니다. 내가 도대체 ​​뭘 잘못하고있는 겁니까?파이썬 3 스레드가 스레드 수보다 큰 작업을 실행하지 않습니다.

''' 
Created on Feb 10, 2012 
This script exists solely to check the configs of prod servers for oom exceptions 
and restarts 
''' 
import shlex 
import subprocess 
import time 
import re 
import os 
import logging 
import logging.handlers 
import queue 
import threading 
from threading import Lock 
import getpass 


#search string: 
ss = "outofmemory" 
password = getpass.getpass("Please type in your sea1 password to mount the drives locally: ") 
user = getpass.getuser() 

max_threads = 9 
log_home = os.path.expanduser("~") 
log_path = os.path.join(log_home, "Desktop") 
log_file = 'Server Parser.log' 
log_out = os.path.join(log_path, log_file) 
logger = logging.getLogger("Server Parser") 
tg_logger = logging.getLogger('thread') 
tg_logger.setLevel(logging.DEBUG) 
tp_logger = logging.getLogger('tpwipe') 
tp_logger.setLevel(logging.DEBUG) 

outFile = "Server Parser (with {} threads).log".format(max_threads) 
output_path = os.path.join(log_path, outFile) 
logger.setLevel(logging.INFO) 
# create file handler which logs even debug messages 
fh = logging.handlers.RotatingFileHandler(log_out, mode='a', 
              maxBytes=2000000, 
              backupCount=6) 
fh.setLevel(logging.DEBUG) 
# create console handler with a higher log level 
ch = logging.StreamHandler() 
ch.setLevel(logging.INFO) 
# create formatter and add it to the handlers 
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') 
fh.setFormatter(formatter) 
ch.setFormatter(formatter) 
# add the handlers to the logger 
logger.addHandler(fh) 
logger.addHandler(ch) 
tp_logger.addHandler(fh) 
tp_logger.addHandler(ch) 
tg_logger.addHandler(ch) 
tg_logger.addHandler(fh) 
q = queue.Queue() 
lock = Lock() 
class ThreadShredder(threading.Thread): 
    """threaded information aggregator""" 
    def __init__(self, myqueue, search_string, l, thr, user, password): 
     threading.Thread.__init__(self) 
     self.q = myqueue 
     self.ss = search_string 
     self.logger = logging.getLogger("thread".format(thr)) 
     self.thread_num = thr 
     self.lock = l 
     self.p = password 
     self.u = user 
    def run(self): 
     ''' 
     will mount all the drives and start parsing, like a boss 
     ''' 
     def wait_check(dl): 
      if os.path.exists(dl+":\\"): 
       self.logger.info("Mount exists, waiting til it frees to continue") 
       time.sleep(20) 
       wait_check(dl) 
     self.lock.acquire() 
     qi = self.q.get() 
     drive_letter = qi[1] 
     wait_check(drive_letter) 
     direc = qi[0] 
     host = qi[2] 
     u = self.u 
     p = self.p 
     self.logger.debug("Thread: {} folder: {}".format(self.thread_num, direc)) 
     mount_cmd = r"net use {}: {} {} /USER:sea1\\{}".format(drive_letter, direc, p, u) 
     args = shlex.split(mount_cmd) 
     self.logger.debug("Thread: {} mount args: {}".format(self.thread_num, args)) 
     p = subprocess.Popen(args) 
     self.lock.release() 
     time.sleep(1) 
     logFile = os.path.join("{}:\\".format(drive_letter), "Server-app.log") 
     ex_time = [] 
     last_restart = None 
     with open(logFile, encoding="utf-8", errors="ignore", mode="r") as data: 
      for line in data: 
       if line.rstrip("\n") == "System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.": 
        continue 
       line_tokens = line.split() 
       if len(line_tokens) >= 7: 
        if line_tokens[6] == "Service" and line_tokens[7] == "starting...": 
         last_restart = line_tokens[1] 
       if self.ss in line.lower().rstrip("\n"): 
        if len(line_tokens) >= 6: 
         job = line_tokens[1] 
         if job == "Exception" or "," not in job: 
          continue 
         ex_time.append(job) 
      if len(ex_time) > 0: 
       self.logger.info("OOM Exception detected on {} at {}".format(host, ex_time[-1])) 
       if last_restart != None: 
        self.logger.info("last restart on {} at {}".format(host, last_restart)) 
     time.sleep(10) 
     unmount = "net use {}: /DELETE".format(drive_letter) 
     unmntCom = shlex.split(unmount) 
     np = subprocess.Popen(unmntCom) 
     np 
     self.q.task_done() 

srv_log_dir = ["l$", "logs"] 
srv_log = "Service-app.log" 
srv_log_path = os.path.join(rmp_log_dir[0], rmp_log_dir[1], rmp_log) 
odd_list = ["mycoolserver", "mycoolserver1", "mycoolserver3", "mycoolserver4"] 
win_drive_letters = ["e", "f", "g", "h", "i", "j", "k", "l", "m", 
        "n", "o", "p", "q", "r", "s", "t", "u", "v", 
        "w", "x", "y", "z"] 
win_drive_list = [] 
full_log_path = [] 
job_list = [] 
srv_list = [] 
folder_string = r'"\\\\server\l$\logs"' 
len_rzt = len(odd_list) 

for i in range(max_threads): 
    j = i + 1 
    t = ThreadShredder(q, ss.lower(), lock, j, user, password) 
    t.setDaemon(True) 
    t.start() 

def find_drives(): 
    ''' 
    Creates contiguous block of ten drive letters that aren't being used 
    ''' 
    drive_block = set() 
    for i in range(10): 
     if not os.path.exists(win_drive_letters[i]+":\\") and win_drive_letters[i] not in drive_block: 
      drive_block.add(win_drive_letters[i]) 
      logger.debug("{} {}".format(i, drive_block)) 
     elif not os.path.exists(win_drive_letters[i+1]+":\\") and win_drive_letters[i+1] not in drive_block: 
      drive_block.add(win_drive_letters[i+1]) 
      logger.debug("{} {}".format(i, drive_block)) 
     elif not os.path.exists(win_drive_letters[i+2]+":\\") and win_drive_letters[i+2] not in drive_block: 
      drive_block.add(win_drive_letters[i+2]) 
      logger.debug("{} {}".format(i, drive_block)) 
    return sorted(drive_block) 
db = find_drives() 
for elem in db: 
    win_drive_list.append(elem) 
    logger.debug("available drive letters: {}".format(elem)) 
#12 is the number of copy move servers we have 
for i in range(12 + len_rzt): 
    z = i - 12 
    j = i + 1 
    x = i 
    if i > 9 and i < 20: 
     x = i - 10 
    elif i > 19 and i < 30: 
     x = i - 20 
    elif i > 29 and i < 40: 
     x = i - 30 
    if i < 9: 
     srv_list.append("server0{}".format(j)) 
    else: 
     srv_list.append("server{}".format(j)) 
    if i == 9: 
     logger.debug("{} ".format(srv_list[i], win_drive_list[x])) 
    if i < 12: 
     folder = re.sub('server', srv_list[i], folder_string) 
     job_list.append(([folder, win_drive_list[x], rmp_list[i]])) 
    else: 
     folder = re.sub('server', odd_list[z], folder_string) 
     job_list.append(([folder, win_drive_list[x], odd_list[z]])) 
    logger.info(job_list[i]) 

for i, job in enumerate(job_list): 
    q.put(job) 
q.join() 

답변

2

.run() 방법은 큐에서 한 아이템을 처리 한 후 종료한다. 변경 대상 :

while True: 
    job = q.get() 
    # ... 

코드에 다른 많은 문제가 있습니다.

+0

그 줄을 추가했는데 이제는 어떤 스레드도 시작하지 않고 중단됩니다. – user352472

+0

@ user352472 :'# ...'는'.run()'메쏘드의 모든 문장을 나타냅니다. – jfs

+0

뜨거운 망할, 그 일했다. 오, 정말 대단히 감사합니다! 이제 다른 문제가 생겼지 만, 나는 그들을 풀어 보려고 노력할 것이다. – user352472

관련 문제