2014-09-10 18 views
2

필자는 최근에 파이썬의 멀티 스레딩 및 멀티 프로세싱 기능을 사용하기 시작했습니다.Python : ProcessPoolExecutor와 함께 외부 대기열을 사용하려면 어떻게해야합니까?

나는 프로듀서/소비자 접근 방식을 사용하여 JSON 로그 파일에서 청크를 읽고, 그 청크를 이벤트로 대기열에 기록한 다음 해당 대기열의 이벤트를 폴링하는 일련의 프로세스를 시작하는 코드를 작성하려고했습니다 (파일 청크) 각각을 처리하여 결과를 인쇄하십시오.

제 의도는 프로세스를 먼저 시작하고 이벤트가 대기열에 들어 오기를 기다리는 것입니다. 예로부터

나는 현재 일부 비트와 조각을 사용하여 작동하는 것 같다이 코드를 사용하고 내가 발견

import re, sys 
from multiprocessing import Process, Queue 

def process(file, chunk): 
    f = open(file, "rb") 
    f.seek(chunk[0]) 
    for entry in pat.findall(f.read(chunk[1])): 
     print(entry) 

def getchunks(file, size=1024*1024): 
    f = open(file, "rb") 
    while True: 
     start = f.tell() 
     f.seek(size, 1) 
     s = f.readline() # skip forward to next line ending 
     yield start, f.tell() - start 
     if not s: 
      break 

def processingChunks(queue): 
    while True: 
     queueEvent = queue.get() 
     if (queueEvent == None): 
      queue.put(None) 
      break 
     process(queueEvent[0], queueEvent[1]) 

if __name__ == "__main__": 
    testFile = "testFile.json" 
    pat = re.compile(r".*?\n") 
    queue = Queue() 

    for w in xrange(6): 
     p = Process(target=processingChunks, args=(queue,)) 
     p.start() 

    for chunk in getchunks(testFile): 
     queue.put((testFile, chunk)) 
     print(queue.qsize()) 
    queue.put(None) 

그러나, 나는 달성하기 위해 concurrent.futures ProcessPoolExecutor을 사용하는 방법을 배우고 싶었다 Future 결과 객체를 사용하여 비동기 방식으로 동일한 결과를 얻습니다.

첫 번째 시도는 다중 처리 관리자를 사용하여 작성된 외부 대기열을 사용하여 수행 한 것으로, 폴링 프로세스로 전달됩니다.

그러나 이것은 작동하지 않는 것 같아요. ProcessPoolExecutor가 작동하도록 설계된 방법이 아니라는 가능성이 있습니다. 내부 대기열을 사용하는 것처럼 보입니다. 그래서 분명 뭔가 잘못하고 있어요 내가 이해하지 못했다 개념에 대해 뭔가가있다,

import concurrent 
from concurrent.futures import as_completed 
import re, sys 
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager 

def process(file, chunk): 
    entries = [] 
    f = open(file, "rb") 
    f.seek(chunk[0]) 
    for entry in pat.findall(f.read(chunk[1])): 
     entries.append(entry) 
     return entries 

def getchunks(file, size=1024*1024): 
    f = open(file, "rb") 
    while True: 
     start = f.tell() 
     f.seek(size, 1) 
     s = f.readline() # skip forward to next line ending 
     yield start, f.tell() - start 
     if not s: 
      break 

def processingChunks(queue): 
    while True: 
     queueEvent = queue.get() 
     if (queueEvent == None): 
      queue.put(None) 
      break 
     return process(queueEvent[0], queueEvent[1]) 

if __name__ == "__main__": 
    testFile = "testFile.json" 
    pat = re.compile(r".*?\n") 
    procManager = Manager() 
    queue = procManager.Queue() 

    with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor: 
     futureResults = [] 
     for i in range(6): 
      future_result = executor.submit(processingChunks, queue) 
      futureResults.append(future_result) 

     for complete in as_completed(futureResults): 
      res = complete.result() 
      for i in res: 
       print(i) 


    for chunk in getchunks(testFile): 
     queue.put((testFile, chunk)) 
     print(queue.qsize()) 
    queue.put(None) 

내가이 어떤 결과를 얻을 수 없어요 :

은이 코드를 사용했다.

내가 어떻게 구현할 수 있는지 이해할 수 있니?

+0

그런데 testFile.json은 각 줄마다 Json 항목이있는 텍스트 파일입니다. – riscado

답변

0

ProcessPoolExecutor을 사용하는 경우 processingChunks 함수를 전혀 사용하지 않거나 가져올 항목을 multiprocessing에서 가져올 필요가 없습니다. 풀은 본질적으로 사용자의 기능이 자동으로 수행되기 전에 수행합니다. 대신, 대기열 및 한 번에 모든 작업을 파견하기 위해이 같은 것을 사용 : 나는 원래 코드가 patprocess에 인수되지 않는으로 어떻게 작동하는지 모르겠어요

with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor: 
    executor.map(process, itertools.repeat(testFile), getchunks(testFile)) 

(내가 기대 한 것 모든 작업자 프로세스가 NameError 예외로 실패 할 수 있습니다. 이것이 실제 문제이고 (예제 코드의 유물이 아닌 경우) filechunk (itertools.repeat이 유용 할 수 있음)과 함께 작업자 프로세스로 전달하기 위해 좀 더 수정해야 할 수도 있습니다.

+0

안녕하세요, 답장을 보내 주셔서 감사합니다. 이해하기 때문에 내부 대기열을 사용합니다. 나는 그것을 시도해 주겠다, 도와 줘서 고마워. regexp는 실제로 첫 번째 예제에서 작동하지만 실제로 프로세스에 전달하지는 않습니다 (간단한 변경이 될 수 있지만 첫 번째 코드 스 니펫을 포함하도록 편집합니다). – riscado

+0

또한 비동기 동작의 경우지도 함수는 아마도 timeout = None을 예상해야한다고 생각합니다. – riscado

0

Blckknght 덕분에 누가 답장을 보내면 올바른 방향으로 나를 밀어 냈습니다. 내 첫 질문에 대한 가능한 해결책은 다음과 같습니다.

#!/usr/bin/python 
import concurrent 
from concurrent.futures import as_completed 
import re, sys 

def process(event): 
    entries = [] 
    fl = event[0] 
    chunk = event[1] 
    pat = event[2] 
    f = open(fl, "rb") 
    f.seek(chunk[0]) 
    for entry in pat.findall(f.read(chunk[1])): 
     entries.append(entry) 
    return entries 

def getchunks(file, pat, size=1024*1024): 
    f = open(file, "rb") 
    while True: 
     start = f.tell() 
     f.seek(size, 1) 
     s = f.readline() # skip forward to next line ending 
     yield (file, (start, f.tell() - start), pat) 
     if not s: 
      break 

if __name__ == "__main__": 
    testFile = "testFile.json" 
    pat = re.compile(r".*?\n") 
    results = [] 

    with concurrent.futures.ProcessPoolExecutor() as executor: 
     for res in (executor.submit(process, event) for event in getchunks(testFile, pat)): 
      results.append(res) 

    for complete in as_completed(results): 
     for entry in complete.result(): 
      print('Event result: %s' % entry)  
관련 문제