1

풀을 사용하여 16 개의 프로세서 사이에서 모든 작업을 균등하게 분배하려고합니다. 내가 처음 발견 한 것은 16 개의 프로세스가 생성된다는 것입니다. 몇 초 후에 2 개의 프로세스 만 적은 수의 작업에 대해 나머지 작업을 모두 실행합니다. 아무리로드를 증가 시키더라도 프로세스를 진행하는 프로세스가 줄어들지 않는 것 같습니다. 결국 남은 작업은 1 ~ 2 개의 프로세스 만 통과합니다.작업을 균등하게 분배하지 않는 다중 처리 - Python

다음은 내 코드의 다중 처리 스 니펫입니다.

c_size = len(sampled_patterns)/(cpu_count() -1) 

pool = Pool(processes=cpu_count() -1) 
works = [(pattern, support_set, hit_rates) for pattern,support_set in sampled_patterns.items()] 
pool.starmap(get_hit_rules,works, chunksize=int(c_size)) 

병렬 처리를 최대화하기 위해 16 개의 프로세서를 모두 사용하고 있습니까? 감사!

편집! 작업을 배포하는 방법입니다. 키로 pid 및 값으로 작업 수로 카운터.

Counter({30179: 14130, 30167: 13530, 30169: 12900, 30173: 12630, 30165: 12465, 30177: 12105, 30163: 11820, 30175: 11460, 30161: 10860, 30181: 10725, 30183: 9855, 30157: 8695, 30159: 6765, 30171: 4860, 30155: 1770}) 
+0

'chunksize'는 당신이하고있는 일을하지 않고 있습니다 -'it'' iterable을 모든 것 위에 균등하게 덩어리로 나누고 싶다면 풀에있는 숫자 프로세스 (예 :'pool._processes')로 설정하십시오 풀의 프로세스 Tho, 그렇게하고 싶다면 진짜 질문은 어째서 '수영장'이 필요한지입니다. – zwer

+0

감사합니다. 이것은 다중 처리를 사용하는 첫 번째 코드입니다. 코드가 많은 프로세스를 생성하는 것보다 덜 무서운 것처럼 보이기 때문에 저는 Pool을 사용하고 있습니다. 나는 수영장이 저를 위해 그것을 돌볼 것이라고 생각했습니다. 방금 풀 읽기 대 프로세스에 대해 더 나은 방법이 있습니까? – Raja

+0

나는'작품들 '에 수십억 가지의 아이템이있을지라도 수백만 달러를 가질 것이다. 그래서'Pool'은 많은'Process'만큼 스폰하는 것보다 더 적합하다고 생각합니다. – Raja

답변

0

좋아, 나는 이것을 답으로 확장 할 것이다.

multiprocessing.Pool의 요점은 여러 프로세스를 생성 한 다음 첫 번째 자유 우선 작업 방식으로 작업을 분배하는 것입니다. 즉, 처리 할 항목이 n이고 풀에 p 수의 프로세스가있는 경우 p (또는 p * chunksize이 정의 된 경우 chunksize이 정의 됨) 수를 선택하고 각 항목을 별도의 처리 프로세스로 보냅니다. 프로세스가 항목 처리를 끝내고 효과적으로 해제 된 후에도 처리되지 않은 항목이있는 경우 풀은 더 이상 항목이 남아 있지 않을 때까지 다음 항목을 선택하여 해제 된 프로세스로 보내고 계속됩니다. 이렇게하면 직접 배포를 관리 할 필요없이 생성 된 프로세스를 최적으로 활용할 수 있습니다.

이는 또한 multiprocessing.Pool이 모든 상황에 적합하지 않음을 의미합니다. 귀하의 경우, 제시된 코드를 기반으로 고정 된 수의 프로세스에서 반복 가능한 iterable을 분할하고 싶으므로 풀이 오버 헤드 일 뿐이며 프로세스가 완료되면 배포 할 데이터가 더 이상 없습니다. 프로세스의 cpu_count()을 시작하고 (각 보내 -

import multiprocessing 

if __name__ == "__main__": # always guard your multiprocessing code 
    cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process 

    works = [(p, s, hit_rates) for p, s in sampled_patterns.items()] 
    chunk_size = (len(works) + cores - 1) // cores # rough chunk size estimate 

    processes = [] # a simple list to hold our process references 
    for i in range(cores): 
     work_set = works[i*chunk_size:(i+1)*chunk_size] 
     process = multiprocessing.Process(target=get_hit_rules, args=(work_set,)) 
     process.start() 
     processes.append(process) 

    results = [process.join() for process in processes] # get the data back 

이것은 당신이 시도했습니다 정확히 할 것입니다 : 당신은 단지 데이터를 분할하여 다른 프로세스에 각 청크를 보낼 경우로는 간단합니다 대략적으로, 마지막 프로세스는 모든 데이터가 한 번에 병렬로 처리되는 방식으로 데이터의 평균 크기가 균등하게 작은 평균 데이터를 얻습니다.

물론 데이터가 너무 커서 메모에 추가로 명확히 설명 할 수없는 경우 결국 관리가 불가능 해지고 multiprocessing.Pool으로 되돌아 가서 생성 된 프로세스에 데이터를 관리 가능한 덩어리로 보내면 처리 할 수 ​​있습니다. 행. 또한 works 목록을 작성하는 것도 의미가 없습니다. 왜 sampled_patterns 사전에 이미 데이터가있는 수십억 개의 항목으로 목록을 작성하고 싶습니까?

중간 목록을 작성하지 않고 sampled_patterns dict의 개별 항목을 보내면 multiprocessing.Pool에 매핑 할 수 있습니다.이렇게하려면, 당신이 필요로하는 모든 그래서, 반복자 슬라이서의 일종을 만드는 대신 multiprocessing.Pool.imap에 먹이를 풀 내부적으로 나머지를 관리 할 수 ​​있도록하는 것입니다 : 물론

import multiprocessing 

def patterns_slicer(patterns, size, hit_rates): 
    pos = 0 # store our current position 
    patterns = patterns.items() # use the items iterator 
    while pos < len(patterns): 
     yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]] 
     pos += size 

if __name__ == "__main__": # always guard your multiprocessing code 
    cores = max(multiprocessing.cpu_count() - 1, 1) # ensure at least one process 
    pool = multiprocessing.Pool(processes=cores) 
    # lets use chunks of 100 patterns each 
    results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates)) 

, multiprocessing.Pool.imap은 많이하지 lookahead. 원래 데이터가 너무 크거나 거대한 청크를 사용하려는 경우 just-in-time 데이터 검색을 사용하여 자신의 을 구현하는 것이 좋습니다. 예를 들어 this answer을 확인하십시오.

+0

시간과 노력에 감사드립니다! 고맙습니다. 나는 아직 내 코드로 작업하는 imap을 얻을 수 없었다. 한 가지 언급해야 할 것은 모든 프로세스의 결과를 업데이트하기 위해'Manager() .dict()'를 사용하고 있다는 것입니다. imap에 제너레이터로 일하면서 작동합니다. 하지만 iterator_slicer를 전달할 때 아무 것도하지 않습니다. – Raja

관련 문제