1

이것이 나의 첫 번째 멀티 프로세싱 구현이다. 나는 순차적 인 접근 방식으로 코드를 실행했고 20 초 정도의 레코드를 처리하는 데 30 초 정도 걸린다. 하지만 각 키가 레코드 집합을 가진 사전을 만들고 모든 키에 대해 pool.map을 사용하여 함수를 적용하려고했습니다. 이제는 각 프로세스마다 각 코어를 할당하지만 2 분 이상 처리 할 수 ​​있습니다. 누군가 나를 최적화하도록 도울 수 있습니까?파이썬 - 멀티 프로세싱은 순차적 인 것보다 느리다

def f(values): 
    data1 = itertools.combinations(values,2) 
    tuple_attr =('Age', 'Workclass', 'Fnlwgt', 'Education', 'Education-num', 'marital-status', 'Occupation', 'Relationship', 'Race', 'Sex', 'Capital-gain', 'Capital-loss', 'Hours-per-week', 'Native country', 'Probability', 'Id') 
    new = ((tuple_attr[i] for i, t in enumerate(zip(*pair)) if t[0]!=t[1]) for pair in data1) 
    skt = set(frozenset(temp) for temp in new) 
    newset = set(s for s in skt if not any(p < s for p in skt)) 

    empty = frozenset(" ") 
    tr_x = set(frozenset(i) for i in empty) 
    tr = set(frozenset(i) for i in empty) 
    for e in newset: 
     tr.clear() 
     tr = tr.union(tr_x) 
     tr_x.clear() 
     for x in tr: 
      for a in e: 
       if x == empty: 
        tmp = frozenset(frozenset([a])) 
        tr_x = tr_x.union([tmp]) 
       else : 
        tmp = frozenset(frozenset([a]).union(x)) 
        tr_x = tr_x.union([tmp]) 
     tr.clear() 
     tr = tr.union(tr_x) 
     tr = set(l for l in tr if not any(m < l for m in tr)) 

    return tr 

def main(): 
    p = Pool(len(data)) #number of processes = number of CPUs 
    keys, values= zip(*data.items()) #ordered keys and values 
    processed_values= p.map(f, values) 
    result= dict(zip(keys, processed_values)) 
    p.close() # no more tasks 
    p.join() # wrap up current tasks 
    print(result) 


if __name__ == '__main__': 
    import csv 
    dicchunk = {*****} #my dictionary 
    main() 
+0

큰 데이터 세트로 시도해보십시오. 다중 쓰레드/프로세스가 문맥 전환, 포킹 등의 오버 헤드에 걸리기 전에 꽤 많은 작업이 필요합니다. –

+0

이것은 하나의 지저분한 코드입니다. 실제로는 변수 이름을 사용해야합니다. @ 코리가 내 의견을 타이핑 할 때 - 큰 데이터 세트를 말할 때. 그러나 너무 비싸지는 않을 것이기 때문에 기억이 아주 비쌀 것입니다. –

+1

샘플 데이터 세트를 제공 할 수 있습니까? – dano

답변

1

나는 multiprocessing 한 번이를 실행하는 테스트 프로그램을 만들어, 한 번없이 :하지만 많은하여,

multi: 191.249588966 
non-multi: 225.774535179 

multiprocessing가 빠르다 : 여기

def main(data): 
    p = Pool(len(data)) #number of processes = number of CPUs 
    keys, values= zip(*data.items()) #ordered keys and values 
    start = time.time() 
    processed_values= p.map(f, values) 
    result= dict(zip(keys, processed_values)) 
    print("multi: {}".format(time.time() - start)) 
    p.close() # no more tasks 
    p.join() # wrap up current tasks 

    start = time.time() 
    processed_values = map(f, values) 
    result2 = dict(zip(keys, processed_values)) 
    print("non-multi: {}".format(time.time() - start)) 
    assert(result == result2) 

출력의 예상대로. 그 이유는 하위 목록 중 일부는 다른 사람보다 (몇 분) 정도 더 오래 걸릴 수 있기 때문입니다. 가장 큰 하위 목록을 처리하는 데 시간이 오래 걸리지는 않습니다.

작업자 함수에 일부 추적을 추가하여이를 보여줍니다. 나는 작업자의 시작에서 시간을 절약하고, 끝에서 그것을 인쇄했다. 당신은 순차적 인 대 약 44 초 절약하고, 그래서 당신이 볼 수 있듯이

<Process(PoolWorker-4, started daemon)> is done. Took 0.940237998962 seconds 
<Process(PoolWorker-2, started daemon)> is done. Took 1.28068685532 seconds 
<Process(PoolWorker-1, started daemon)> is done. Took 42.9250118732 seconds 
<Process(PoolWorker-3, started daemon)> is done. Took 193.635578156 seconds 

이, 노동자가 작업의 매우 불평등 한 양을하고있다 : 여기에 출력합니다.

+0

좋아요. 나는 그것을 얻었다, 나의 컴퓨터에서보고있는 것. 이 균형을 유지할 수있는 방법이 있습니까? 완료하는 작업자가 다음 사용 가능한 작업을 수행하는 방식으로 균형을 유지할 수있는 방법이 있습니까? 그리고 왜 어떤 하위 목록에만 오랜 시간이 걸릴지 모르기 때문에 다른 방법으로 코드를 최적화 할 수 있습니다. 이것은 단지 20 레코드입니다. 다음에 50k 레코드를 처리 할 것입니다. :( –

+0

'cpu_count()'항목 이상을'map'에 전달하면 작업자는 다음 항목을 다음과 같이 반복 할 것입니다. 불완전한 하위 목록들과이 문제의 균형을 잡으려고하는 것은 아마 까다로울 것입니다.'f'가하는 일을 나누어서 조각을 병렬 처리 할 수있는 방법을 찾아야합니다. 그들을 결합해라. 나는 그 방법에서 당신이 그 일을하는 가장 좋은 방법을 말할만큼 충분히 잘 이해하지 못한다. (또는 그것도 가능하다면) – dano

+0

Ok.그러나 나는 그 기능 내에서 내가하고있는 일을 나눌 수 없다. 그것은 순차적으로 행해져 야한다. 어쨌든, 고마워. –

관련 문제