2016-06-21 9 views
1

while 루프를 빠르게하려면 Python에서 다중 처리를 사용하고 싶습니다.조건부 for 루프의 병렬 처리/다중 처리

더 구체적으로는
저는 매트릭스 (샘플 * 기능)가 있습니다. 임의의 하위 집합의 특징 값이 특정 값 (이 경우 -1)과 같지 않은 x 개의 하위 집합을 선택하려고합니다.

내 일련 번호 :

np.random.seed(43) 
datafile = '...' 
df = pd.read_csv(datafile, sep=" ", nrows = 89) 

no_feat = 500 
no_samp = 5 
no_trees = 5 
i=0 
iter=0 


samples = np.zeros((no_trees, no_samp)) 
features = np.zeros((no_trees, no_feat)) 

while i < no_trees: 
    rand_feat = np.random.choice(df.shape[1], no_feat, replace=False) 
    iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False) 

    samp_idx = [] 
    a=0 

#-------------- 
    #how to run in parallel? 

    for j in iter_order: 
     pot_samp = df.iloc[j, rand_feat] 
     if len(np.where(pot_samp==-1)[0]) == 0: 
      samp_idx.append(j) 
     if len(samp_idx) == no_samp: 
      print a 
      break 
     a+=1 

#-------------- 

    if len(samp_idx) == no_samp: 
     samples[i,:] = samp_idx 
     features[i, :] = rand_feat 
     i+=1 
    iter+=1 
    if iter>1000: #break if subsets cannot be found 
     break 

피팅 샘플을 검색 이론에 병렬로 실행할 수있는 가능성이 비싼 부분 (for 루프 J)이다. 어떤 경우에는, 충분히 큰 서브 세트를 찾기 위해 모든 샘플을 반복 할 필요가 없기 때문에, 서브 세트가 충분히 크자 마자 루프에서 빠져 나가는 이유가 있습니다.
나는 얼마나 많은 유효한 결과가 이미 생성되었는지를 확인할 수있는 구현을 찾기 위해 고심하고있다. 심지어 가능할까요?

전에 joblib을 사용했습니다. 내가 올바르게 이해한다면 이것은 pool 멀티 프로세싱의 방법을 별도의 작업을 위해서만 작동하는 백엔드로 사용합니까? queues이 도움이 될지 모르지만 지금까지는 구현에 실패했습니다.

+0

: 나는 =하지만 많은)

내 코드를 배웠습니다. 나는 각 코어 당 하나의 프로세스를 실행하고, 공유 카운터를 생성하고, "잠금"에 의해 보호되거나 원자 정수로 구현되며, 특정 계수 (중복을 고려하여)에 도달 할 때까지 증가시킨 다음 모든 프로세스가 완료되고, 결과를 반환합니다. (아마도'apply_async()'를 사용하면된다). – advance512

+1

@ advance512이 방법을 사용해 주셔서 감사합니다. – Dahlai

답변

0

실용적인 해결책을 찾았습니다. while 루프를 병렬로 실행하고 다른 프로세스가 공유 카운터를 통해 상호 작용하도록 결정했습니다. 또한 적절한 샘플 검색을 벡터화했습니다.

벡터화는 ~ 300x 속도 향상을 가져 왔으며 4 코어에서 실행하면 계산 속도가 두 배 빨라졌습니다.

먼저 별도의 프로세스를 구현하고 결과를 queue에 넣으려고했습니다. 많은 양의 데이터를 저장하는 것은 아닙니다.

다른 사람이 해당 코드에서 다른 병목 현상이 발견되면 누군가 지적한 경우 기쁠 것입니다.

기본적으로 병렬 컴퓨팅에 대한 지식이 없기 때문에 나는 인터넷을 통한 예제가 모두 매우 기본적인 것이기 때문에이 문제를 함께 해결하기가 정말 힘들다는 것을 알게되었습니다. multiprocessing.pool` 말이`joblib` 또는 '사용

import numpy as np 
import pandas as pd 
import itertools 
from multiprocessing import Pool, Lock, Value 
from datetime import datetime 
import settings 


val = Value('i', 0) 
worker_ID = Value('i', 1) 
lock = Lock() 

def findSamp(no_trees, df, no_feat, no_samp): 
    lock.acquire() 
    print 'starting worker - {0}'.format(worker_ID.value) 
    worker_ID.value +=1 
    worker_ID_local = worker_ID.value 
    lock.release() 

    max_iter = 100000 
    samp = [] 
    feat = [] 
    iter_outer = 0 
    iter = 0 
    while val.value < no_trees and iter_outer<max_iter: 
     rand_feat = np.random.choice(df.shape[1], no_feat, replace=False 

     #get samples with random features from dataset; 
     #find and select samples that don't have missing values in the random features 
     samp_rand = df.iloc[:,rand_feat] 
     nan_idx = np.unique(np.where(samp_rand == -1)[0]) 
     all_idx = np.arange(df.shape[0]) 
     notnan_bool = np.invert(np.in1d(all_idx, nan_idx)) 
     notnan_idx = np.where(notnan_bool == True)[0] 

     if notnan_idx.shape[0] >= no_samp: 
      #if enough samples for random feature subset, select no_samp samples randomly 
      notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False) 
      rand_feat_rand = rand_feat 

      lock.acquire() 
      val.value += 1 
      #x = val.value 
      lock.release() 
      #print 'no of trees generated: {0}'.format(x) 
      samp.append(notnan_idx_rand) 
      feat.append(rand_feat_rand) 

     else: 
      #increase iter_outer counter if no sample subset could be found for random feature subset 
      iter_outer += 1 

     iter+=1 
    if iter >= max_iter: 
     print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local) 
    else: 
     print 'worker{0} - finished'.format(worker_ID_local) 
    return samp, feat 

def initialize(*args): 
    global val, worker_ID, lock 
    val, worker_ID, lock = args 

def star_findSamp(i_df_no_feat_no_samp): 
    return findSamp(*i_df_no_feat_no_samp) 


if __name__ == '__main__': 
    np.random.seed(43) 
    datafile = '...' 
    df = pd.read_csv(datafile, sep=" ", nrows = 89) 
    df = df.fillna(-1) 
    df = df.iloc[:, 6:] 

    no_feat = 700 
    no_samp = 10 
    no_trees = 5000 


    startTime = datetime.now() 
    print 'starting multiprocessing' 
    ncores = 4 
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock)) 
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp)) 

    result = p.map(star_findSamp, args)#, callback=log_result) 
    p.close() 
    p.join() 

    print '{0} sample subsets for tree training have been found'.format(val.value) 

    samples = [x[0] for x in result if x != None] 
    samples = np.vstack(samples) 
    features = [x[1] for x in result if x != None] 
    features = np.vstack(features) 
    print datetime.now() - startTime