2013-08-16 3 views
1

subprocess.Popen()을 사용하여 명령 줄에서 실행되는 프로그램에서 회귀 테스트를 실행하기 위해 python 스크립트 (cygwin 및 Linux 환경 용)를 작성하고 있습니다. 기본적으로, 나는 일련의 작업을 가지고 있는데, 그 중 일부는 개발자의 필요에 따라 실행될 필요가있다 (10-1000 정도). 각 작업은 완료하는 데 몇 초에서 20 분 정도 소요될 수 있습니다.Python의 다중 처리 풀에서 작업을 동적으로 재정렬

여러 작업에서 성공적으로 작업을 수행하지만 이전 작업을 지능적으로 정렬하여 긴 작업을 먼저 실행하여 시간을 절약하려고합니다. 합병증은 일부 작업 (정상 상태 계산)이 다른 작업보다 먼저 실행되어야한다는 것입니다 (정상 상태로 결정된 초기 조건을 기반으로 한 과도 현상).

이 처리 방법은 상위 작업과 모든 하위 작업을 동일한 프로세스에서 반복적으로 실행하는 것이지만 일부 작업에는 여러 개의 장기 실행 하위가 있습니다. 상위 작업이 완료되면 하위 프로세스를 풀에 다시 추가하려고하지만 대기열의 헤드에 추가해야합니다. 나는 다중 처리로 이것을 할 수 있을지 확신하지 못한다. 바보. Manager와 함께 예를 살펴 보았지만 모두 네트워킹에 기반을두고 있으며 특히 적용 할 수 없습니다. 코드 형태의 도움이나 멀티 프로세싱에 대한 좋은 자습서 링크 (내가 봤으니 ...)는 많이 감사하겠습니다. 지금까지 내가 가지고있는 것에 대한 코드의 해골이 있는데, 다른 프로세서에서 생성하고자하는 자식 작업을 지적했다.

import multiprocessing 
import subprocess 

class Job(object): 
    def __init__(self, popenArgs, runTime, children) 
    self.popenArgs = popenArgs #list to be fed to popen 
    self.runTime = runTime #Approximate runTime for the job 
    self.children = children #Jobs that require this job to run first 

def runJob(job): 
    subprocess.Popen(job.popenArgs).wait() 
    #################################################### 
    #I want to remove this, and instead kick these back to the pool 
    for j in job.children: 
    runJob(j) 
    #################################################### 

def main(jobs): 
    # This jobs argument contains only jobs which are ready to be run 
    # ie no children, only parent-less jobs 
    jobs.sort(key=lambda job: job.runTime, reverse=True) 
    multiprocessing.Pool(4).map(runJob, jobs) 
+0

무작위 참고 사항 : 여기서는 다중 처리를 사용하지 않는 것이 좋습니다. 스레드를 사용하거나 프로세스를 시작하고 대기하는 전통적인 방법 ('subprocess' 모듈 및'os.wait()')으로 동일한 결과를 얻을 수 있습니다. –

답변

0

첫째, 나에게 두 번째 아민 리고의 설명을 보자 : 다중 스레드 대신 여기에 여러 프로세스를 사용할 이유가 없습니다. 제어 프로세스에서 하위 프로세스를 기다리는 데 대부분의 시간을 소비합니다. 당신은 CPU 집약적 인 병렬 작업을 할 필요가 없습니다.

스레드를 사용하면 주요 문제를 쉽게 해결할 수 있습니다. 지금은 다른 작업의 속성 인 암시 적 종속성 그래프에 작업을 저장하고 있습니다. 스케줄링 측면에서 작업을 정렬하는 별도의 데이터 구조가 필요합니다. 또한 각 작업 트리는 현재 하나의 작업자 프로세스에 연결되어 있습니다. 작업자가 작업을 수행하는 데 사용하는 데이터 구조에서 작업자를 분리하려고합니다. 그런 다음 작업자는 각각 동일한 작업 대기열에서 작업을 가져옵니다. 작업자가 작업을 마친 후에는 해당 작업의 하위 작업을 대기열에 추가하고 사용 가능한 모든 작업자가 처리 할 수 ​​있습니다.

부모 작업이 끝나면 줄의 맨 앞에 자식 작업을 삽입하려면 스택 모양의 컨테이너가 필요에 맞게 보일 것입니다. Queue 모듈은 사용할 수있는 스레드 안전 LifoQueue 클래스를 제공합니다.

import threading 
import subprocess 
from Queue import LifoQueue 

class Job(object): 
    def __init__(self, popenArgs, runTime, children): 
    self.popenArgs = popenArgs 
    self.runTime = runTime 
    self.children = children 

def run_jobs(queue): 
    while True: 
    job = queue.get() 
    subprocess.Popen(job.popenArgs).wait() 
    for child in job.children: 
     queue.put(child) 
    queue.task_done() 

# Parameter 'jobs' contains the jobs that have no parent. 
def main(jobs): 
    job_queue = LifoQueue() 
    num_workers = 4 
    jobs.sort(key=lambda job: job.runTime) 
    for job in jobs: 
    job_queue.put(job) 
    for i in range(num_workers): 
    t = threading.Thread(target=run_jobs, args=(job_queue,)) 
    t.daemon = True 
    t.start() 
    job_queue.join() 

노트의 몇

: 모든 작업이 작업자 스레드를 모니터링하여 수행 할 때 수행 할 작업을 추적하지 않기 때문에 (1) 우리는 알 수 없다. 큐의 직업입니다. 따라서 메인 쓰레드는 큐 객체를 모니터하여 모든 작업이 언제 완료되는지를 알 수 있습니다 ( job_queue.join()). 따라서 작업자 스레드를 데몬 스레드로 표시 할 수 있으므로 작업자를 기다리지 않고 주 스레드가 수행 할 때마다 프로세스가 종료됩니다. 따라서 우리는 주 스레드와 작업자 스레드 사이의 통신이 필요 없어 후자가 루프에서 빠져 나와 중단 할 때를 알 수 있습니다.

(2) 대기열에 포함 된 모든 작업이 완료된 것으로 표시된 경우 (특히 대기열에 들어간 항목 수와 동일한 횟수만큼 task_done()이 호출되었을 때) 작업이 완료되었음을 알 수 있습니다. 모든 작업이 완료된 상태로 큐의 빈 상태를 사용하는 것은 신뢰할 수 없습니다. 대기열은 작업을 팝핑하고 해당 작업의 하위를 대기열에 넣는 것 사이에 일시적으로 오도 된 것일 수 있습니다.

+0

나는 이것을했다고 생각한다. 시험을 마치면 수표를 주겠다. 도와 줘서 고마워! – joshindc

관련 문제