2017-01-11 7 views
0

파일을 구문 분석하고 나중에 병렬 처리해야하는 큰 목록으로 변환하는 스크립트를 만들려고합니다. 나는 파이썬의 멀티 프로세싱에 대한 몇 가지 구현을 시도했지만, 모두 순차적으로 실행되는 것처럼 보인다.파이썬 다중 처리 공유 목록

def grouper(n, iterable, padvalue=None): 
    """grouper(3, 'abcdefg', 'x') --> 
    ('a','b','c'), ('d','e','f'), ('g','x','x')""" 
    return izip_longest(*[iter(iterable)]*n, fillvalue=padvalue) 

def createRecords(givenchunk): 
    for i1 in range(len(givenchunk)): 
    <create somedata> 
    records.append(somedata) 

if __name__=='__main__': 
    manager = Manager() 
    parsedcdrs = manager.list([]) 
    records = manager.list([]) 

    <some general processing here which creates a shared list "parsedcdrs". Uses map to create a process "p" in some def which is terminated afterwards.> 

    # Get available cpus 
    cores = multiprocessing.cpu_count() 

    # First implementation with map with map. 
    t = multiprocessing.Pool(cores) 
    print "Map processing with chunks containing 5000" 
    t.map(createRecords, zip(parsedcdr), 5000) 

    # Second implementation with async. 
    t = multiprocessing.Pool(cores) 
    for chunk in grouper(5000, parsedcdr): 
    print "Async processing with chunks containing 5000" 
    t.apply_async(createRecords, args=(chunk,), callback=log_result) 
    t.close() 
    t.join() 

    # Third implementation with Process. 
    jobs = [] 
    for chunk in grouper(5000, parsedcdr): 
    t = multiprocessing.Process(target=createRecords, args=(chunk,)) 
    t.start() 
    jobs.append(t) 
    print "Process processing with chunks containing 5000" 
    for j in jobs: 
    j.join() 
    for j in jobs: 
    j.join() 

누군가 올바른 방향으로 나를 가리킬 수 있습니까?

+0

귀하의 첫 번째 구현에서 볼 수있는 한, 귀하의 aproach는 거의 괜찮습니다. 그러나 한 가지 질문으로 처리 할 목록의 각 요소는 다른 목록 (또는 반복 가능)입니까? "parsedcdr"목록 – Netwave

+0

요소는 예를 들어, 실제로 다른리스트이다 : '[1482232410, ['astp3 'u'elem1'u'elem2 'u'elem3'], [1,482,232,576 , [ 'astp3', 'u'elem4', u'elem5 ', u'elem6']]] ' – driesken

+1

첫 번째 구현은 문제없이 작동해야하며, 왜 그들이 제 2 세계에서 실행되고 있다고 생각합니까? 또한 '5000'덩어리를 삭제하여 하나씩 선택하는지 확인하십시오 – Netwave

답변

0

위 예제에서 다중 처리가 정상적으로 작동하는 것 같습니다. 문제가 다른 def에 위치하여 성능이 저하되었습니다.