2016-11-22 1 views
5

현재 각 항목에 대해 계산 집약적 인 행렬 곱셈을해야하는 큰 파일 (8 천만 행)을 읽으려고합니다. 이 계산 후, 결과를 데이터베이스에 삽입하고 싶습니다. 이 프로세스의 시간 집중적 인 방식 때문에 프로세스 속도를 높이기 위해 파일을 여러 코어로 분할하려고합니다.파이썬 : 다중 코어를 사용하는 파일 처리

연구를 마친 후 파일을 n 부분으로 나눈이 유망한 시도를 발견했습니다. 이 작동하는 동안 나는이 사용하는 다중 병렬 문제로 실행

if __name__ == '__main__': 
    fp = open(filename) 
    number_of_chunks = 4 
    for chunk_number in range(number_of_chunks): 
     print chunk_number, 100 * '=' 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

:

def file_block(fp, number_of_blocks, block): 
    ''' 
    A generator that splits a file into blocks and iterates 
    over the lines of one of the blocks. 

    ''' 

    assert 0 <= block and block < number_of_blocks 
    assert 0 < number_of_blocks 

    fp.seek(0,2) 
    file_size = fp.tell() 

    ini = file_size * block/number_of_blocks 
    end = file_size * (1 + block)/number_of_blocks 

    if ini <= 0: 
     fp.seek(0) 
    else: 
     fp.seek(ini-1) 
     fp.readline() 

    while fp.tell() < end: 
     yield fp.readline() 

반복적이 같은 함수를 호출 할 수있는 오류의 존재와

fp = open(filename) 
number_of_chunks = 4 
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)] 

p = Pool(cpu_count() - 1) 
p.map(processChunk,li) 

을, 그 발전기는 절임 될 수 없다.

이 오류를 이해하는 동안 모든 파일을 처음부터 반복하여 모든 줄을 목록에 넣는 것은 너무 비쌉니다. (일반적인지도 방법을 사용하는 경우 대신 1로) 데이터베이스에

감사를 한 번에 여러 행을 삽입하는 것이 더 효율적이기 때문에

은 또한, 나는, 반복 당 코어 당 선 블록을 사용하려면 너의 도움으로.

+3

큰 파일의 초기 통과를 통해 탐색 좌표와 해당 위치에서 읽을 행 수를 기록 할 수 있습니다. 이 두 숫자를 사용하여 멀티 프로세싱을 호출하고 발전기를 각 프로세스에 끼워 넣을 수 있습니다. – kezzos

+0

먼저 파일을 네 개의 파일로 분할 할 수 있습니까? – cwallenpoole

+0

파일 열기 및'file_block' 코드를 스레드가 시작되기 전에 초기화하는 대신 각 스레드로 이동하십시오. 읽기만 가능한 한 파일을 한 번이 아닌 네 번 열어도 문제가되지 않습니다. –

답변

3

발전기를 앞쪽으로 만들고 각 스레드에 전달하는 대신 스레드 코드에 남겨 둡니다.

def processChunk(params): 
    filename, chunk_number, number_of_chunks = params 
    with open(filename, 'r') as fp: 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)] 
p.map(processChunk, li) 
관련 문제