필자는 최근에 파이썬의 멀티 스레딩 및 멀티 프로세싱 기능을 사용하기 시작했습니다.Python : ProcessPoolExecutor와 함께 외부 대기열을 사용하려면 어떻게해야합니까?
나는 프로듀서/소비자 접근 방식을 사용하여 JSON 로그 파일에서 청크를 읽고, 그 청크를 이벤트로 대기열에 기록한 다음 해당 대기열의 이벤트를 폴링하는 일련의 프로세스를 시작하는 코드를 작성하려고했습니다 (파일 청크) 각각을 처리하여 결과를 인쇄하십시오.
제 의도는 프로세스를 먼저 시작하고 이벤트가 대기열에 들어 오기를 기다리는 것입니다. 예로부터
나는 현재 일부 비트와 조각을 사용하여 작동하는 것 같다이 코드를 사용하고 내가 발견
import re, sys
from multiprocessing import Process, Queue
def process(file, chunk):
f = open(file, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
print(entry)
def getchunks(file, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield start, f.tell() - start
if not s:
break
def processingChunks(queue):
while True:
queueEvent = queue.get()
if (queueEvent == None):
queue.put(None)
break
process(queueEvent[0], queueEvent[1])
if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
queue = Queue()
for w in xrange(6):
p = Process(target=processingChunks, args=(queue,))
p.start()
for chunk in getchunks(testFile):
queue.put((testFile, chunk))
print(queue.qsize())
queue.put(None)
그러나, 나는 달성하기 위해 concurrent.futures ProcessPoolExecutor을 사용하는 방법을 배우고 싶었다 Future 결과 객체를 사용하여 비동기 방식으로 동일한 결과를 얻습니다.
첫 번째 시도는 다중 처리 관리자를 사용하여 작성된 외부 대기열을 사용하여 수행 한 것으로, 폴링 프로세스로 전달됩니다.
그러나 이것은 작동하지 않는 것 같아요. ProcessPoolExecutor가 작동하도록 설계된 방법이 아니라는 가능성이 있습니다. 내부 대기열을 사용하는 것처럼 보입니다. 그래서 분명 뭔가 잘못하고 있어요 내가 이해하지 못했다 개념에 대해 뭔가가있다,
import concurrent
from concurrent.futures import as_completed
import re, sys
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager
def process(file, chunk):
entries = []
f = open(file, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
entries.append(entry)
return entries
def getchunks(file, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield start, f.tell() - start
if not s:
break
def processingChunks(queue):
while True:
queueEvent = queue.get()
if (queueEvent == None):
queue.put(None)
break
return process(queueEvent[0], queueEvent[1])
if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
procManager = Manager()
queue = procManager.Queue()
with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
futureResults = []
for i in range(6):
future_result = executor.submit(processingChunks, queue)
futureResults.append(future_result)
for complete in as_completed(futureResults):
res = complete.result()
for i in res:
print(i)
for chunk in getchunks(testFile):
queue.put((testFile, chunk))
print(queue.qsize())
queue.put(None)
내가이 어떤 결과를 얻을 수 없어요 :
은이 코드를 사용했다.내가 어떻게 구현할 수 있는지 이해할 수 있니?
그런데 testFile.json은 각 줄마다 Json 항목이있는 텍스트 파일입니다. – riscado