2015-02-06 2 views
8

일부 시뮬레이션을 수행하는 함수가 있는데 은 배열을 문자열 형식으로 반환합니다.Python 다중 처리 - pool.map 작업의 프로세스 추적

의 입력 매개 변수 값, 가능한 입력 값이 10000 개 이상인 에 대한 시뮬레이션 (함수)을 실행하고 결과를 단일 파일에 기록하려고합니다.

시뮬레이션을 병렬로 실행하려면 다중 처리, 특히 pool.map function 을 사용하고 있습니다.

이상의 시뮬레이션 기능을 실행하는 전체 프로세스가 매우 오랜 시간이 걸리기 때문에 전체 작업 과정을 추적하고 싶습니다.

아래의 현재 코드에서 문제가되는 것은 pool.map이 해당 작업 중 프로세스 추적없이 10000 번 실행되는 것입니다. 일단 병렬 처리가 10000 시뮬레이션 실행을 완료하면 (수 시간이 걸릴 수 있음), 10000 시뮬레이션 결과가 파일에 저장 될 때 추적을 계속합니다. 그래서 pool.map 조작의 처리를 추적하지 않습니다.

프로세스 추적을 허용 할 수있는 간단한 코드가 있습니까?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

답변

7

반복되는 map 함수를 사용하면 진행 상황을 추적하는 것이 매우 쉽습니다.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> def simFunction(x,y): 
... import time 
... time.sleep(2) 
... return x**2 + y 
... 
>>> x,y = range(100),range(-100,100,2) 
>>> res = Pool().imap(simFunction, x,y) 
>>> with open('results.txt', 'w') as out: 
... for i in x: 
...  out.write("%s\n" % res.next()) 
...  if i%10 is 0: 
...  print "%s of %s simulated" % (i, len(x)) 
... 
0 of 100 simulated 
10 of 100 simulated 
20 of 100 simulated 
30 of 100 simulated 
40 of 100 simulated 
50 of 100 simulated 
60 of 100 simulated 
70 of 100 simulated 
80 of 100 simulated 
90 of 100 simulated 

또는 비동기식 map을 사용할 수 있습니다. 여기서 나는 그것을 조금씩 다르게 할 것이다. 내가 pathos.multiprocessing 대신 multiprocessing을 사용하고

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

참고. 복수 입력을 사용하는 map 함수를 사용하고 훨씬 더 나은 직렬화 기능을 제공하며 map 호출을 (__main__이 아닌) 호출 할 수있게하는 것은 단지 multiprocessing의 포크입니다. multiprocessing을 사용하여 위와 같은 작업을 수행 할 수도 있지만 코드는 매우 약간 다를 수 있습니다.

반복 또는 비동기 map을 사용하면 더 나은 프로세스 추적을 원하는 코드를 작성할 수 있습니다.예를 들어 각 작업에 고유 한 "id"를 전달하고 돌아 오는 것을 관찰하거나 각 작업이 프로세스 ID를 반환하도록합니다. 진행 상황과 프로세스를 추적하는 데는 여러 가지 방법이 있지만 위의 내용은 시작을 제공해야합니다.

현재 pathos를 얻을 수 있습니다 : https://github.com/uqfoundation

+0

정말 고마워요! – user32147

3

"쉬운 수정"이 없습니다. map은 구현 세부 정보를 숨기려합니다. 이 경우에는 이 필요합니다. 세부 사항입니다. 즉, 사물은 정의상 조금 더 복잡해집니다. 통신 패러다임을 바꿔야합니다. 그렇게하는 방법에는 여러 가지가 있습니다.

하나는 결과를 수집하기위한 대기열을 만들고 직원들이 결과를이 대기열에 넣을 수 있도록하는 것입니다. 그런 다음 모니터링 스레드 또는 프로세스 내에서 대기열을보고 들어 오면 결과를 소비 할 수 있습니다. 소비하는 동안이를 분석하고 로그 출력을 생성 할 수 있습니다. 진행 상황을 추적하는 가장 일반적인 방법 일 수 있습니다. 어떤 방식 으로든 수신되는 결과에 실시간으로 응답 할 수 있습니다.

더 간단한 방법은 작업자 함수를 약간 수정하고 거기에 로그 출력을 생성하는 것입니다. 외부 도구 (예 : grepwc)를 사용하여 로그 출력을 신중하게 분석하면 매우 간단하게 추적 할 수 있습니다.

+1

감사합니다. 간단한 예제를 제공해 주시겠습니까? – user32147

3

내가 무엇을 당신이 필요로하는 것은 로그 파일 생각합니다.

로깅 모듈을 Python 표준 라이브러리의 일부로 사용하는 것이 좋습니다. 그러나 유감스럽게도 로깅은 다중 처리 안전하지 않습니다. 따라서 앱에서 즉시 사용할 수는 없습니다.

따라서 다중 처리 안전 로그 처리기를 사용하거나 대기열을 사용하여 구현하거나 로깅 모듈과 함께 잠글 필요가 있습니다.

많은 논의가 Stackoverflow에 있습니다. 이것은 예를 들어 :

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

당신은 할 수 있습니다 : CPU 부하의 대부분이 시뮬레이션 기능에 당신이 로그 회전을 사용하지 않을 경우 How should I log while using multiprocessing in Python?

, 당신은 아마이 같은 간단한 잠금 메커니즘을 사용할 수 있습니다 실행시 "tail -f"로그 파일. 다음과 같이 표시되어야합니다.

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

Windows 및 Linux에서 사용 가능합니다.

희망이 있습니다.

+0

'multiprocessing.get_logger()'는 잠금으로 보호되는 기능 제한 로거를 반환합니다. https://docs.python.org/2/library/multiprocessing.html#logging –

+0

예,하지만 이것은 모듈 로거입니다 ... 그래서 당신이 그것을 사용할 수 있습니다, 당신의 로그는 모듈 수준의 메시지와 혼합됩니다 : 그것을 시도하면 다음과 같은 메시지가 표시됩니다 : 2015-02-08 23 : 47 : 10,954 9288 DEBUG 핸들과 semlock을 만들었습니다 –

+0

오, 당신은 맞아, 나는 그것을 실제로 사용하지 않았고 문서를 너무 빨리 보았다. –

관련 문제