2017-12-31 21 views
0

multprocess.Pool.apply_async로 작업하는 단일 파일에 로깅 할 수 없습니다. Logging Cookbook에서 this 예제를 적용하려고 시도했지만 multiprocessing.Process에서만 작동합니다. 로깅 큐를 apply_async에 전달하는 것은 효과가없는 것 같습니다. 풀을 사용하여 동시 스레드 수를 쉽게 관리 할 수 ​​있습니다.다중 처리로 단일 파일에 로그온하는 방법 .Pool.apply_async

다음은 멀티 프로세스를 사용하는 예입니다. 프로세스는 주 프로세스의 로그 메시지를받지 못한다는 점을 제외하고 나에게 잘 작동하며, 큰 작업이 100 개있을 때 제대로 작동하지 않는다고 생각합니다. multiprocessing.Manager, multiprocessing.Queue, multiprocessing.get_logger, apply_async.get를 사용하여, 나는 많은 약간의 변화를 시도했습니다

def main_with_pool(): 
    start_time = time.time() 
    queue = multiprocessing.Queue(-1) 
    listener = multiprocessing.Process(target=listener_process, 
             args=(queue, listener_configurer)) 
    listener.start() 
    pool = multiprocessing.Pool(processes=3) 
    job_list = [np.random.randint(10)/2 for i in range(10)] 
    single_thread_time = np.sum(job_list) 
    for i, sleep_time in enumerate(job_list): 
     name = str(i) 
     pool.apply_async(worker_function, 
         args=(sleep_time, name, queue, worker_configurer)) 

    queue.put_nowait(None) 
    listener.join() 
    end_time = time.time() 
    print("Script execution time was {}s, but single-thread time was {}s".format(
     (end_time - start_time), 
     single_thread_time 
    )) 

if __name__ == "__main__": 
    main_with_pool() 

:

import logging 
import logging.handlers 
import numpy as np 
import time 
import multiprocessing 
import pandas as pd 
log_file = 'PATH_TO_FILE/log_file.log' 

def listener_configurer(): 
    root = logging.getLogger() 
    h = logging.FileHandler(log_file) 
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') 
    h.setFormatter(f) 
    root.addHandler(h) 

# This is the listener process top-level loop: wait for logging events 
# (LogRecords)on the queue and handle them, quit when you get a None for a 
# LogRecord. 
def listener_process(queue, configurer): 
    configurer() 
    while True: 
     try: 
      record = queue.get() 
      if record is None: # We send this as a sentinel to tell the listener to quit. 
       break 
      logger = logging.getLogger(record.name) 
      logger.handle(record) # No level or filter logic applied - just do it! 
     except Exception: 
      import sys, traceback 
      print('Whoops! Problem:', file=sys.stderr) 
      traceback.print_exc(file=sys.stderr) 


def worker_configurer(queue): 
    h = logging.handlers.QueueHandler(queue) # Just the one handler needed 
    root = logging.getLogger() 
    root.addHandler(h) 
    # send all messages, for demo; no other level or filter logic applied. 
    root.setLevel(logging.DEBUG) 


# This is the worker process top-level loop, which just logs ten events with 
# random intervening delays before terminating. 
# The print messages are just so you know it's doing something! 
def worker_function(sleep_time, name, queue, configurer): 
    configurer(queue) 
    start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time) 
    logging.info(start_message) 
    time.sleep(sleep_time) 
    success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time) 
    logging.info(success_message) 

def main_with_process(): 
    start_time = time.time() 
    single_thread_time = 0. 
    queue = multiprocessing.Queue(-1) 
    listener = multiprocessing.Process(target=listener_process, 
             args=(queue, listener_configurer)) 
    listener.start() 
    workers = [] 
    for i in range(10): 
     name = str(i) 
     sleep_time = np.random.randint(10)/2 
     single_thread_time += sleep_time 
     worker = multiprocessing.Process(target=worker_function, 
             args=(sleep_time, name, queue, worker_configurer)) 
     workers.append(worker) 
     worker.start() 
    for w in workers: 
     w.join() 
    queue.put_nowait(None) 
    listener.join() 
    end_time = time.time() 
    final_message = "Script execution time was {}s, but single-thread time was {}s".format(
     (end_time - start_time), 
     single_thread_time 
    ) 
    print(final_message) 

if __name__ == "__main__": 
    main_with_process() 

하지만 다음 적응이 동작하지 않습니다(),하지만 일을하지 못했습니다.

나는 이것을위한 기성품 해결책이있을 것이라고 생각합니다. 대신 샐러리를 시도해야합니까?

감사합니다.

+0

torek의 조언에 따라이 문제를 해결했습니다. 나는 github에 대한 [예제] (https://github.com/ClayCampaigne/multiprocessing-pool-logging/blob/master/pool_logging.py)를 가지고 있습니다. – GrayOnGray

답변

1

가 얽혀 여기에 두 개의 별도의 문제, 사실이 있습니다

  • 당신은 풀 기반의 함수에 인수로 multiprocessing.Queue() 개체를 전달할 수는 (당신이 직접 시작 노동자에 전달할 수 있지만, 그 당시의 "더 멀리있는"것이 아닙니다).
  • None을 수신기 프로세스로 보내기 전에 모든 비동기 작업자가 완료 될 때까지 기다려야합니다. 함께

    queue = multiprocessing.Queue(-1) 
    

    : 관리자 관리 Queue() 인스턴스 같은

    queue = multiprocessing.Manager().Queue(-1) 
    

    가 통과 할 수

대체, 첫 번째 문제를 해결한다.

pool.close() 
pool.join() 
queue.put_nowait(None) 

또는 더 복잡한 :

는 예를 들어, 두 번째를 수정하거나 각 비동기 호출에서 각 결과를 수집하거나 풀을 닫고 기다릴

getters = [] 
for i, sleep_time in enumerate(job_list): 
    name = str(i) 
    getters.append(
     pool.apply_async(worker_function, 
        args=(sleep_time, name, queue, worker_configurer)) 
    ) 
while len(getters): 
    getters.pop().get() 
# optionally, close and join pool here (generally a good idea anyway) 
queue.put_nowait(None) 

(당신 put_nowaitput 대기 버전으로 바꾸고 무제한 대기열을 사용하지 않는 것이 좋습니다.

+0

감사! 나는 남은 문제가있다. 작업자가 작업을 집을 때마다, 각 로깅 이벤트에 대해 보낸 indentical 메시지의 수는 하나씩 증가한다. 'worker_configurer' 함수에서 핸들러를 추가하기 전에'if not len ​​(root.handlers) :'조건을 추가하려고 시도했지만 아무것도하지 않았습니다. 지금이 질문 [다중 처리 광기 로깅] (https://stackoverflow.com/questions/20332359/logging-with-multiprocessing-madness)을 참조하고 있습니다. (나는'Process' 설정에서 코드를 적용하고 있기 때문에 일부 함수가 잘못 명명 된 것을 깨닫습니다.) – GrayOnGray

+0

키워드 인자'maxtasksperchild = 1'을 사용하여 풀을 초기화하여 나머지 작업자를 해결할 수 있도록 나머지 작업을 해결했습니다 각 작업이 새 작업자를 생성합니다. 문제의 유사성에도 불구하고 나는 [다중 처리 광란과 함께 로깅] (https://stackoverflow.com/questions/20332359/logging-with-multiprocessing-madness)과 그 해답에서 사용할 가치가 있다고 생각하는 것을 찾지 못했습니다. – GrayOnGray

+1

기본적으로 약간의 까다 롭습니다. 특히 각자의 프로세스가 원래의 부모와 독립적이기 때문에, 이식성이 필요한 경우에는 특히 그렇습니다. 그러나 비 Windows 시스템에서는 각 프로세스가 로거 구성을 포함하여 원래의 부모 상태로 시작됩니다. 로깅 모듈은 루트 로깅 인스턴스에 대한 일종의 싱글 톤으로 작동하도록 설계되었습니다. 한편, 핸들러는 스택 가능하고 따라서 싱글 톤이 아닙니다. 그래서 당신은 각 노동자들에게 미친 퀼트 가능성을 부여합니다. sayan의 답변 - 한 지점에서 로깅을하는 것이 훨씬 쉽습니다. – torek

1

두 개의 대기열을 사용하는 것을 고려해보십시오. 첫 번째 대기열은 작업자를위한 데이터를 저장하는 곳입니다. 작업 완료 후 각 작업자는 결과를 두 번째 대기열로 푸시합니다. 이제이 두 번째 대기열을 사용하여 로그를 파일에 기록하십시오.

+0

이것은 이해하기 쉽고 관리가 쉬운 것처럼 보입니다 - 로깅과 멀티 프로세싱의 상호 작용을 이해할 수 있을지 모르겠지만 궁극적으로는 궁극적으로 저는 꽤 긴 멀티 파트 알고리즘을 병렬 처리 할 것이기 때문에 잠재적으로 제한적입니다. 다른 용도로 만들어 졌기 때문에 알고리즘의 여러 부분에 특정한 로깅을 보존하려고합니다. – GrayOnGray

관련 문제