2012-01-03 7 views
15

매우 큰 csv 파일 (64MB에서 500MB)을 에 넣고 줄 단위로 작업 한 다음 작은 고정 크기 파일을 출력하는 다중 처리를 사용하여 응용 프로그램을 병렬 처리하려고합니다.다중 처리를 위해 대용량 파일에서 데이터 청크?

은 현재 내가 불행하게도 메모리에 완전히 로드되는 list(file_obj)을 수행 (내 생각) 나는 그때 n 개의 부분으로 그 목록을 휴식, n은 내가 실행하려는 프로세스의 수있는. 그런 다음 pool.map()을 깨진 목록에 표시합니다.

이 방법은 단일 스레드 된 파일 열기 및 반복 방식 방법과 비교할 때 실제로 매우 나쁜 런타임을 갖고있는 것으로 보입니다. 누군가 가 더 나은 해결책을 제시 할 수 있습니까?

또한, 특정 열의 값을 으로 보존하는 그룹으로 파일의 행을 처리해야합니다. 이러한 행 그룹 자체는 균등 분할 될 수 있지만 그룹은이 열에 둘 이상의 값을 포함해야합니다.

답변

14

list(file_obj)fileobj이 큰 경우 많은 메모리가 필요할 수 있습니다. itertools을 사용하여 필요할 때 줄 바꿈을 제거하여 메모리 요구 사항을 줄일 수 있습니다.

특히, 우리는 처리 가능한 덩어리로 파일을 분할

reader = csv.reader(f) 
chunks = itertools.groupby(reader, keyfunc) 

을 사용할 수 있으며,

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] 
result = pool.map(worker, groups) 

한 번에 num_chunks 덩어리에 멀티 풀 작업을 할 수 있습니다.

이렇게하면 전체 파일 대신 메모리에 약간의 (num_chunks) 청크를 저장할 정도로만 메모리가 필요합니다.


import multiprocessing as mp 
import itertools 
import time 
import csv 

def worker(chunk): 
    # `chunk` will be a list of CSV rows all with the same name column 
    # replace this with your real computation 
    # print(chunk) 
    return len(chunk) 

def keyfunc(row): 
    # `row` is one row of the CSV file. 
    # replace this with the name column. 
    return row[0] 

def main(): 
    pool = mp.Pool() 
    largefile = 'test.dat' 
    num_chunks = 10 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 
    pool.close() 
    pool.join() 
    print(results) 

if __name__ == '__main__': 
    main() 
내가 선이 상호되지 않습니다 말했을 때 나는 거짓말을
+0

- csv로에서은 (이름 열을 기준으로 분할 할 필요가 열이, 그 이름을 가진 모든 행이 없습니다 헤어질 것). 그러나이 기준에 따라 그룹화 할 수 있다고 생각합니다. 감사! 나는 itertools에 대해 아무것도 몰랐고, 이제는 아무것도 아닌 것보다 조금 더 알았다. – user1040625

+0

원본 코드에 오류가 있습니다. 'pool.apply_async'에 대한 모든 호출은 블로킹 (non-blocking)이므로 전체 파일이 한 번에 큐에 대기 중입니다. 이로 인해 메모리를 절약 할 수 없었을 것입니다. 그래서 한 번에'num_chunks'를 대기열에 넣기 위해 루프를 조금 바 꾸었습니다. 'pool.map'에 대한 호출이 블로킹되어 전체 파일이 한 번에 대기열에 올라가지 않게합니다. – unutbu

+0

@HappyLeapSecond 사용자가 여기에서 메소드를 구현하려고 시도하고 있습니다. http://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiprocessing. 문제가 있습니다. 아마도 당신이 도울 수 있습니까? – m0meni

1

나는 그것을 간단하게 유지할 것이다. 단일 프로그램에서 파일을 열고 한 줄씩 읽으십시오. 분할 할 파일 수를 선택하고 많은 출력 파일을 열며 모든 파일을 다음 파일에 기록 할 수 있습니다. 그러면 파일이 n 개의 부분으로 분할됩니다. 그런 다음 각 파일에 대해 병렬로 Python 프로그램을 실행할 수 있습니다.

관련 문제