2012-03-19 2 views
2

거대한 파일이있어서 그것을 읽고 프로세스해야합니다.파이썬에서 스레딩을 활용하여 큰 파일을 파싱하는 방법은 무엇입니까?

with open(source_filename) as source, open(target_filename) as target: 
    for line in source: 
     target.write(do_something(line)) 

    do_something_else() 

스레드를 사용하여 가속화 할 수 있습니까? 한 줄에 하나의 쓰레드를 생성한다면 엄청난 오버 헤드 비용이 들까 요?

편집 :이 질문을 토론하지 않으려면 코드는 어떻게 생겼습니까?

with open(source_filename) as source, open(target_filename) as target: 
    ? 

는 @Nicoretti는 : 반복에서 나는 데이터의 몇 KB의 라인을 읽을 필요가있다.

업데이트 2 : 파이썬 풀고 기다려야 할 수 있도록이 파일이하는 BZ2 수 있습니다 :

$ bzip2 -d country.osm.bz2 | ./my_script.py 
+0

일부 처리를 수행하기 위해 얼마만큼의 데이터를 읽어야합니까? – Nicoretti

답변

7

읽기, 처리 및 쓰기의 세 가지 스레드를 사용할 수 있습니다. 가능한 이점은 I/O를 기다리는 동안 처리가 발생할 수 있지만 상황에 실제 이점이 있는지 확인하기 위해 몇 가지 타이밍을 취해야합니다.

import threading 
import Queue 

QUEUE_SIZE = 1000 
sentinel = object() 

def read_file(name, queue): 
    with open(name) as f: 
     for line in f: 
      queue.put(line) 
    queue.put(sentinel) 

def process(inqueue, outqueue): 
    for line in iter(inqueue.get, sentinel): 
     outqueue.put(do_something(line)) 
    outqueue.put(sentinel) 

def write_file(name, queue): 
    with open(name, "w") as f: 
     for line in iter(queue.get, sentinel): 
      f.write(line) 

inq = Queue.Queue(maxsize=QUEUE_SIZE) 
outq = Queue.Queue(maxsize=QUEUE_SIZE) 

threading.Thread(target=read_file, args=(source_filename, inq)).start() 
threading.Thread(target=process, args=(inq, outq)).start() 
write_file(target_filename, outq) 

계속 증가하는 메모리 사용을 방지하기 위해 큐에 대한 maxsize을 설정하는 것이 좋습니다. 1000의 가치는 내 부분에 대한 임의의 선택입니다.

+0

20 ~ 30GB의 파일을 가지고 있는데이 스크립트를 실행하면 상주 메모리가 증가하지 않을까요? –

+1

@Satarangi_Re 예, 대기열의 크기를 제한하는 것이 좋습니다. 그에 따라 수정 된 답변. –

+0

나는 이것을 QUEUE _SIZE = 1000과 10으로 시도했다 .... 메모리가 증가하고있다 .... 이것을 제한하는 방법이 있는가? OS에 의해 어떻게 죽거나 죽을지 극단적 인 방법으로 보지 않았습니다. 어떠한 제안 ...? –

2

는 처리 단계, 즉, 상대적으로 시간이 오래 걸릴 않습니다, 그것은 CPU-intenstive입니까? 그렇지 않다면, 아니요, 당신은 스레딩이나 멀티 프로세싱으로 그다지 승리하지 못합니다. 처리 비용이 비싼 경우 예. 그래서, 당신은 확실히 알고 프로파일해야합니다.

파일을 읽는 데 상대적으로 더 많은 시간을 소비하는 경우, 즉 처리보다 크면 스레드를 사용하여 성능을 얻을 수 없으며 병목 현상은 스레드가 향상시키지 못하는 IO입니다.

3

이것은 정확히 을 분석하려고하면 안되는 정확한 종류입니다. 대신에을 분석해야하지만 프로파일해야합니다.

스레딩은 행당 처리량이 많은 경우에만 도움이된다는 점에 유의하십시오. 또 다른 방법은 전체 파일을 메모리에 버리고 메모리에서 처리하는 것입니다. 이렇게하면 스레딩을 방지 할 수 있습니다.

줄마다 스레드를 가지고 있는지 여부는 미세 조정을위한 것이지만, 줄을 파싱하지 않으면 고정 된 수의 작업자 스레드를 사용하는 것이 좋습니다.

다른 대안이 있습니다 : 하위 프로세스를 생성하고 해당 하위 프로세스를 읽고 처리하도록합니다. 문제에 대한 설명을 감안할 때, 나는 이것이 당신에게 최고의 속도 향상을 줄 것으로 기대합니다. memcached (또는 이와 유사한 시스템이나 관계형 데이터베이스)와 같이 읽기 속도를 높이기 위해 일종의 메모리 내 캐싱 시스템을 사용할 수도 있습니다.

3

CPython에서 스레드는 global interpreter lock으로 제한됩니다. 실제로 한 번에 하나의 스레드 만 파이썬 코드를 실행할 수 있습니다. 따라서 스레딩은 다음 중 하나 일 경우에만 유용합니다.

  1. 전역 해석기 잠금을 필요로하지 않는 처리를하고 있습니다. 또는

  2. I/O를 차단하는 데 시간을 소비하고 있습니다. (1)이 Python Imaging Library의 화상에 필터를 적용하거나 numpy에서 행렬의 고유 값을 찾는 것을 포함

예. (2)의 예는 사용자 입력 대기 또는 네트워크 연결이 데이터 전송을 완료 할 때까지 대기하는 것을 포함합니다.

CPython에서 스레드를 사용하여 코드를 가속화 할 수 있는지 여부는 do_something 호출에서 정확히 수행 한 작업에 따라 다릅니다. (하지만 파이썬에서 라인을 파싱하는 경우 스레드를 실행하여 속도를 높일 수는 없습니다.) 또한 스레드를 시작하면 결과를 작성할 때 동기화 문제가 발생합니다. 타겟 파일. 스레드가 시작된 순서대로 완료된다는 보장이 없으므로 출력이 올바른 순서로 출력되도록주의해야합니다.

여기에는 입력 읽기, 출력 쓰기 및 각 행 처리를위한 스레드를위한 스레드가있는 최대 스레드 구현이 있습니다. 단일 스레드 버전 (또는 3 개의 스레드 만있는 Janne 버전)보다 빠르거나 느린 지 여부를 테스트하면 알 수 있습니다.

from threading import Thread 
from Queue import Queue 

def process_file(f, source_filename, target_filename): 
    """ 
    Apply the function `f` to each line of `source_filename` and write 
    the results to `target_filename`. Each call to `f` is evaluated in 
    a separate thread. 
    """ 
    worker_queue = Queue() 
    finished = object() 

    def process(queue, line): 
     "Process `line` and put the result on `queue`." 
     queue.put(f(line)) 

    def read(): 
     """ 
     Read `source_filename`, create an output queue and a worker 
     thread for every line, and put that worker's output queue onto 
     `worker_queue`. 
     """ 
     with open(source_filename) as source: 
      for line in source: 
       queue = Queue() 
       Thread(target = process, args=(queue, line)).start() 
       worker_queue.put(queue) 
     worker_queue.put(finished) 

    Thread(target = read).start() 
    with open(target_filename, 'w') as target: 
     for output in iter(worker_queue.get, finished): 
      target.write(output.get()) 
+0

나는 GIL에 대해 알고 있으며 주 스레드가 I/O를 기다리는 동안 다른 스레드를 실행하는 것이 합리적인지 알아 내려고합니다. –

관련 문제