list(file_obj)
은 fileobj
이 큰 경우 많은 메모리가 필요할 수 있습니다. itertools을 사용하여 필요할 때 줄 바꿈을 제거하여 메모리 요구 사항을 줄일 수 있습니다.
특히, 우리는 처리 가능한 덩어리로 파일을 분할
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
을 사용할 수 있으며,
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
한 번에 num_chunks
덩어리에 멀티 풀 작업을 할 수 있습니다.
이렇게하면 전체 파일 대신 메모리에 약간의 (num_chunks
) 청크를 저장할 정도로만 메모리가 필요합니다.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
내가 선이 상호되지 않습니다 말했을 때 나는 거짓말을
- csv로에서은 (이름 열을 기준으로 분할 할 필요가 열이, 그 이름을 가진 모든 행이 없습니다 헤어질 것). 그러나이 기준에 따라 그룹화 할 수 있다고 생각합니다. 감사! 나는 itertools에 대해 아무것도 몰랐고, 이제는 아무것도 아닌 것보다 조금 더 알았다. – user1040625
원본 코드에 오류가 있습니다. 'pool.apply_async'에 대한 모든 호출은 블로킹 (non-blocking)이므로 전체 파일이 한 번에 큐에 대기 중입니다. 이로 인해 메모리를 절약 할 수 없었을 것입니다. 그래서 한 번에'num_chunks'를 대기열에 넣기 위해 루프를 조금 바 꾸었습니다. 'pool.map'에 대한 호출이 블로킹되어 전체 파일이 한 번에 대기열에 올라가지 않게합니다. – unutbu
@HappyLeapSecond 사용자가 여기에서 메소드를 구현하려고 시도하고 있습니다. http://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiprocessing. 문제가 있습니다. 아마도 당신이 도울 수 있습니까? – m0meni