2013-08-21 2 views
0

python3에서 여러 프로세스를 실행하는 데 문제가 있습니다.python3을 사용한 다중 처리는 한 번만 실행됩니다.

내 프로그램은 다음을 수행합니다 1. 출력에 결과를 sqllite 데이터베이스에서 항목을 가져 와서 2. input_queue 일회성 항목을 여러 프로세스를 만들기 input_queue에 전달하는 기능과 출력을 통해 실행 열. 3. output_queue에서 항목을 가져 와서 스레드를 만듭니다.이 스레드는 분명히 처음 두 단계 전에 시작됩니다.

제 2 단계의 '함수'는 현재 여러 번 실행됩니다. 프로세스 수를 설정합니다. 예를 들어 프로세스 수를 8로 설정하면 8 번만 실행 한 다음 중지합니다. input_queue에서 모든 항목을 지울 때까지 실행을 계속한다고 가정했습니다.

데이터베이스에서 항목을 가져 오는 기능 (1 단계)을 다른 프로세스로 다시 작성한 다음 출력 대기열을 2 단계의 입력 대기열로 전달해야합니까?

편집 : 다음은 코드의 예입니다. 데이터베이스 항목의 대체로 숫자 목록을 사용하여 같은 방식으로 수행합니다. 나는 목록에있는 300 개 항목이와 나는 모두 300 개 항목을 처리하고 싶지만, 지금은 10 (I가 할당 한 프로세스의 수) 처리

#!/usr/bin/python3 
from multiprocessing import Process,Queue 
import multiprocessing 
from threading import Thread 


## This is the class that would be passed to the multi_processing function 
class Processor: 
    def __init__(self,out_queue): 
     self.out_queue = out_queue 
    def __call__(self,in_queue): 
     data_entry = in_queue.get() 
     result = data_entry*2 
     self.out_queue.put(result) 



#Performs the multiprocessing 
def perform_distributed_processing(dbList,threads,processor_factory,output_queue): 
    input_queue = Queue() 


    # Create the Data processors. 
    for i in range(threads): 
     processor = processor_factory(output_queue) 
     data_proc = Process(target = processor, 
          args = (input_queue,)) 

     data_proc.start() 

    # Push entries to the queue. 

    for entry in dbList: 
     input_queue.put(entry) 


    # Push stop markers to the queue, one for each thread. 

    for i in range(threads): 
     input_queue.put(None) 

    data_proc.join() 
    output_queue.put(None) 


if __name__ == '__main__': 
    output_results = Queue() 

    def output_results_reader(queue): 
     while True: 
      item = queue.get() 
      if item is None: 
       break 
      print(item) 


    # Establish results collecting thread. 
    results_process = Thread(target = output_results_reader,args = (output_results,)) 
    results_process.start() 

    # Use this as a substitute for the database in the example 
    dbList = [i for i in range(300)] 

    # Perform multi processing 
    perform_distributed_processing(dbList,10,Processor,output_results) 

    # Wait for it all to finish. 
    results_process.join() 
+0

디버깅을 원하면 코드를 제시해야합니다. 이상적으로 우리에게 [SSCCE] (http : // sscce.org)는 여러분이하려는 일과 잘못되는 일을 보여주기에 충분할 정도로 박탈되었습니다. – abarnert

+0

하지만 당신이 잘못 작성한 코드는 어쨌든 작성 될 필요가 없으며,'multiprocessing.Pool' 또는'concurrent.futures.ProcessPoolExecutor'는 자동으로 필요한 모든 것을 할 것입니다. – abarnert

+0

답장을 보내 주셔서 감사합니다. 질문에 코드를 추가했습니다. 수영장이 내가 가지고있는 것과 함께 작동하는지 확실하지 않습니다. 편집이 필요한 줄은 content_table을 반복하는 것입니다. – Lezan

답변

2

입력 큐를 처리하고 출력 큐에 쓰는 프로세스 모음은 프로세스 풀의 정의입니다.

처음부터 빌드하는 방법을 알고 싶다면 가장 좋은 방법은 source code for multiprocessing.Pool을 보는 것입니다. 이것은 매우 단순하게 파이썬이며 아주 잘 작성되었습니다. 그러나 예상대로 다시 구현하는 대신 multiprocessing.Pool으로 사용할 수 있습니다. 문서의 예제는 매우 훌륭합니다.

하지만 실제로 풀 대신 executor을 사용하면이 작업을 더 간단하게 만들 수 있습니다. 두 모듈의 차이점을 설명하기는 어렵지만 기본적으로 future은 작업을 실행하고 결과를 얻는 다양한 방법을 가진 풀 대신 "스마트"결과 개체입니다. 그냥 미래를 돌려주는 방법을 모르는 바보 같은 일이 필요합니다. 그들은 대신 순서로 와서 당신이 그들을 처리하려는 경우,

from concurrent.futures import ProcessPoolExecutor 

def Processor(data_entry): 
    return data_entry*2 

def perform_distributed_processing(dbList, threads, processor_factory): 
    with ProcessPoolExecutor(processes=threads) as executor: 
     yield from executor.map(processor_factory, dbList) 

if __name__ == '__main__': 
    # Use this as a substitute for the database in the example 
    dbList = [i for i in range(300)] 
    for result in perform_distributed_processing(dbList, 8, Processor): 
     print(result) 

을 (가장 사소한 경우 물론, 코드는 ... 거의 동일 어느 쪽이든 모양) 또는 :

def perform_distributed_processing(dbList, threads, processor_factory): 
    with ProcessPoolExecutor(processes=threads) as executor: 
     fs = (executor.submit(processor_factory, db) for db in dbList) 
     yield from map(Future.result, as_completed(fs)) 

공지 사항을 그 또한 "다음 결과 기다리기"와 "가장 최근의 결과 처리"를 인터리브하는 방법을 제공하기 때문에 처리중인 대기열과 스레드를 바꿨습니다.이 경우 yield (또는이 경우에는 yield from) 모든 것을 복잡하게하지 않고 오버 헤드를 줄이고 잠재적 인 문제를 일으킬 수 있습니다.

+0

정말 고맙습니다. 나는 보지 못했습니다. 새로 고침하는 것을 잊어 버렸던 마지막 답장이 Pool 클래스를 실험하고있었습니다. 이 예는 많은 도움을 주었으며 오늘 저녁 몇 시간 동안 나를 구해 줬습니다. 고맙습니다 ! – Lezan

+0

@Lezan : 풀이 executor보다 쉽게 ​​만들어지는 경우가 있다는 것을 명심하십시오 ('pool.imap'은 future를 다루는 코드의 세 줄보다 간단합니다). 그리고'pool.map'이하는 chunking/batching 거의 작동하지 않을 때마다 자동으로 큰 성능 향상을 얻을 수 있으므로 차이점을 이해하고 그 차이를 이해할 가치가 있습니다. – abarnert

+0

executor.map 데이터베이스가 더 잘 작동하는 것처럼 보였으므로이를 사용할 것입니다. 나는 오늘 아침까지 Pools에 대해서조차 모르고 있었기 때문에 이것은 매우 배움의 경험이었습니다. – Lezan

2

전체를 다시 작성하려고하지 마십시오 다중 처리 라이브러리. 난 당신의 요구에 따라 multiprocessing.Pool 방법 중 하나를 사용할 수있는 것 같아요 - 이것은 배치 작업을 경우에도 동기 multiprocessing.Pool.map()을 사용할 수 있습니다 - 입력 대기열로 밀어 대신에, 당신은 스레드에 입력을 생성하는 발전기를 작성해야합니다.

+0

감사합니다. 지금 풀을 들여다보고 있습니다. 하지만 내 문제는 다른 프로세스가 입력 (위의 편집 된 코드)으로 사용하기 위해 대기열에 추가하는 것이고 Pool을 사용하고 있다면 여전히 작동할까요? – Lezan

+0

@Lezan : 큐를 입력으로 사용해야하는 다른 '프로세스'는 아니며, 필요로하는 _task_입니다. 그래서 그냥'apply' 함수를 인자로 넘겨 주면됩니다. (또는'map'에 전달한 함수를 감싸기 위해 클로저를 만들면됩니다). – abarnert

+0

@Lezan 물론 모든 사람이 대기열을 공유하면 '초기화 프로그램'에있는 모든 프로세스로 밀어 넣을 수 있습니다.하지만 이는 단지 최적화 일뿐입니다. – abarnert