2016-11-22 1 views
0

파이썬에서 계산 속도를 줄이기 위해 다중 처리를 구현하려고하지만 다중 처리 후에 전체 계산 속도가 크게 줄었습니다. 나는 4 개의 다른 프로세스를 만들었고 4 개의 다른 데이터 프레임으로 dataFrame을 분할했다. 이것은 각 프로세스의 입력이 될 것이다. 각 프로세스를 타이밍 한 후에 오버 헤드 비용이 중요한 것처럼 보이며 이러한 오버 헤드 비용을 줄이는 방법이 있는지 궁금해하고있었습니다.파이썬에서 다중 처리 시간을 단축하는 방법

windows7, python 3.5를 사용하고 있으며 컴퓨터에 8 개의 코어가 있습니다.

def doSomething(args, dataPassed,): 

    processing data, and calculating outputs 

def parallelize_dataframe(df, nestedApply): 
    df_split = np.array_split(df, 4) 
    pool = multiprocessing.Pool(4) 
    df = pool.map(nestedApply, df_split) 
    print ('finished with Simulation') 
    time = float((dt.datetime.now() - startTime).total_seconds()) 

    pool.close() 
    pool.join() 

def nestedApply(df): 

    func2 = partial(doSomething, args=()) 
    res = df.apply(func2, axis=1) 
    res = [output Tables] 
    return res 

if __name__ == '__main__': 

data = pd.read_sql_query(query, conn) 

parallelize_dataframe(data, nestedApply) 
+2

단일 스레딩이 멀티 프로세싱보다 오래 걸리는 횟수를 나열 할 수 있습니까? – Fruitspunchsamurai

+0

당신은 얼마나 많은 CPU/코어를 가지고 있습니까 (하이퍼 쓰레드가 아닌 실제 것입니까?). CPU 집약적 인 작업처럼 보이므로 코어 수 이상으로 분할하면 작업 속도가 느려집니다. 또한 데이터 프레임의 크기와 doSomething의 비용은 얼마나 큽니까? 데이터 프레임을 각 서브 프로세스로 가져 오려면 직렬화 (pickle을 통해)하고 직렬화를 해제해야합니다. 프레임이 크고'doSomething'이 값이 싼 경우에는 오버 헤드에 소요되는 시간을 대부분 볼 수 있습니다. –

+0

@ Fruitspunchsamurai 단일 스레드를 실행하는 데 26 분이 걸렸지 만 매핑 기능을 실행하는 데 33 분이 걸렸으며 전체적으로 71 분이 걸렸습니다. – Hojin

답변

0

DataFrame을 청크로 제공하는 대신 대기열을 사용하는 것이 좋습니다. 각 청크를 복사하려면 많은 자원이 필요하며 그렇게하려면 약간의 시간이 걸립니다. DataFrame이 정말 큰 경우 메모리가 부족할 수 있습니다. 대기열을 사용하면 팬더에서 빠른 반복자를 사용할 수 있습니다. 여기 내 접근 방식입니다. 오버 헤드는 작업자의 복잡성으로 인해 감소합니다. 불행히도, 제 노동자들은 실제로 그것을 보여주기가 쉽지 않습니다. 그러나 sleep은 조금 복잡합니다. 이 두 배 빠르다 numProc = 4으로 루프 당 50sec 얻어 numProc = 2를 사용

import pandas as pd 
import multiprocessing as mp 
import numpy as np 
import time 


def worker(in_queue, out_queue): 
    for row in iter(in_queue.get, 'STOP'): 
     value = (row[1] * row[2]/row[3]) + row[4] 
     time.sleep(0.1) 
     out_queue.put((row[0], value)) 

if __name__ == "__main__": 
    # fill a DataFrame 
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD')) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue)) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    # iterator over rows 
    it = df.itertuples() 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for i in range(len(df)): 
     while out_queue.empty(): 
      # fill the queue 
      try: 
       row = next(it) 
       in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple 
      except StopIteration: 
       break 
     row_data = out_queue.get() 
     df.loc[row_data[0], "Result"] = row_data[1] 

    # signals for processes stop 
    for p in process: 
     in_queue.put('STOP') 

    # wait for processes to finish 
    for p in process: 
     p.join() 

.

관련 문제