2012-05-29 3 views
0

나는 수십만 줄의 파일이 있는데, 각 줄마다 동일한 프로세스 (공분산 계산)를 거쳐야합니다. 꽤 오래 걸리므로 멀티 스레드로 가고있었습니다. 그러나 내가 본 모든 예제/튜토리얼은 내가하고 싶은 일에 대해 상당히 복잡했습니다. 누구든지 두 모듈을 함께 사용하는 방법을 설명하는 좋은 자습서를 가르쳐 줄 수 있다면 좋을 것입니다. 내가 병렬로 일을 처리 할 때마다파이썬에서 스레딩 및 큐 모듈을 사용하는 멀티 스레딩

답변

0

, 나는이 비슷한을 사용 (난 그냥 기존 스크립트에서이 찢어진) : 기본적으로

#!/usr/bin/env python2 
# This Python file uses the following encoding: utf-8 

import os, sys, time 
from multiprocessing import Queue, Manager, Process, Value, Event, cpu_count 

class ThreadedProcessor(object): 
    def __init__(self, parser, input_file, output_file, threads=cpu_count()): 
    self.parser = parser 

    self.num_processes = threads 
    self.input_file = input_file 
    self.output_file = output_file 

    self.shared_proxy = Manager() 

    self.input_queue = Queue() 
    self.output_queue = Queue() 

    self.input_process = Process(target=self.parse_input) 
    self.output_process = Process(target=self.write_output) 

    self.processes = [Process(target=self.process_row) for i in range(self.num_processes)] 

    self.input_process.start() 
    self.output_process.start() 

    for process in self.processes: 
     process.start() 

    self.input_process.join() 

    for process in self.processes: 
     process.join() 

    self.output_process.join() 

    def parse_input(self): 
    for index, row in enumerate(self.input_file): 
     self.input_queue.put([index, row]) 

    for i in range(self.num_processes): 
     self.input_queue.put('STOP') 

    def process_row(self): 
    for index, row in iter(self.input_queue.get, 'STOP'): 
     self.output_queue.put([index, row[0], self.parser.parse(row[1])]) 

    self.output_queue.put('STOP') 

    def write_output(self): 
    current = 0 
    buffer = {} 

    for works in range(self.num_processes): 
     for index, id, row in iter(self.output_queue.get, 'STOP'): 
     if index != current: 
      buffer[index] = [id] + row 
     else: 
      self.output_file.writerow([id] + row) 
      current += 1 

      while current in buffer: 
      self.output_file.writerow(buffer[current]) 
      del buffer[current] 
      current += 1 

, 당신의 읽기/쓰기 관리하는 두 개의 프로세스가 파일. 하나는 입력을 읽고 파싱하고, 다른 하나는 "done"큐에서 읽고 출력 파일에 씁니다. 다른 프로세스가 생성됩니다 (이 경우 숫자는 CPU에있는 총 프로세서 코어 수와 동일합니다). 그리고 모든 프로세스가 입력 대기열의 요소를 처리합니다.