2011-10-29 7 views
12

여러 프로세스를 사용하여 체크섬을 계산하여 여러 코어를 활용하는 클래스를 작성하려고합니다. 나는 이것을 위해 아주 간단한 클래스를 가지고 있으며, 간단한 경우를 실행할 때 훌륭하게 작동한다. 그러나 두 개 이상의 클래스 인스턴스를 만들 때마다 작업자는 종료되지 않습니다. 파이프가 부모에 의해 닫혔다는 메시지를받지 못하는 것 같습니다.파이썬 다중 처리 파이프 사용

모든 코드는 아래에서 찾을 수 있습니다. 먼저 작동하는 md5와 sha1 체크섬을 개별적으로 계산 한 다음 병렬로 계산을 시도한 다음 파이프를 닫을 때 프로그램을 잠급니다.

여기에 무슨 일이 일어나고 있습니까? 내가 예상 한대로 파이프가 작동하지 않는 이유는 무엇입니까? 큐에 "Stop"메시지를 보내어 아이가 그런 식으로 종료하게하는 방법으로 해결할 수 있다고 생각합니다. 그러나 이것이 왜 그렇게 작동하지 않는지 정말 알고 싶습니다.

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 
    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     self.parent_conn.close() # This is the child. Close unused end. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 


def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums in parallel causes a lockup! 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here! 

main() 

PS. 사람이 관심이 있다면이 문제는 위의 코드의 작업 버전을 여기을 해결되었습니다

import multiprocessing 
import hashlib 

class ChecksumPipe(multiprocessing.Process): 

    all_open_parent_conns = [] 

    def __init__(self, csname): 
     multiprocessing.Process.__init__(self, name = csname) 
     self.summer = eval("hashlib.%s()" % csname) 
     self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False) 
     ChecksumPipe.all_open_parent_conns.append(self.parent_conn) 
     self.result_queue = multiprocessing.Queue(1) 
     self.daemon = True 
     self.start() 
     self.child_conn.close() # This is the parent. Close the unused end. 

    def run(self): 
     for conn in ChecksumPipe.all_open_parent_conns: 
      conn.close() # This is the child. Close unused ends. 
     while True: 
      try: 
       print "Waiting for more data...", self 
       block = self.child_conn.recv_bytes() 
       print "Got some data...", self 
      except EOFError: 
       print "Finished work", self 
       break 
      self.summer.update(block) 
     self.result_queue.put(self.summer.hexdigest()) 
     self.result_queue.close() 
     self.child_conn.close() 

    def update(self, block): 
     self.parent_conn.send_bytes(block) 

    def hexdigest(self): 
     self.parent_conn.close() 
     return self.result_queue.get() 

def main(): 
    # Calculating the first checksum works 
    md5 = ChecksumPipe("md5") 
    md5.update("hello") 
    print "md5 is", md5.hexdigest() 

    # Calculating the second checksum works 
    sha1 = ChecksumPipe("sha1") 
    sha1.update("hello") 
    print "sha1 is", sha1.hexdigest() 

    # Calculating both checksums also works fine now 
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1") 
    md5.update("hello") 
    sha1.update("hello") 
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() 

main() 
+0

'self.parent_conn.close()'다음에'ChecksumPipe.all_open_parent_conns.remove (self.parent_conn)'를 추가하여 연결을 끊을 수 있습니다. –

+0

'self.summer = eval ("hashlib. % s()"% csname)'이보기 흉합니다. 'self.summer = getattr (hashlib, csname)()은 어떨까요? – glglgl

답변

7

그래, 즉 참으로 놀라운 동작입니다.

그러나 두 개의 병렬 하위 프로세스에 대해 lsof의 출력을 보면 두 번째 하위 프로세스에 더 많은 파일 설명자가 열려 있음을 쉽게 알 수 있습니다.

두 개의 병렬 하위 프로세스가 시작될 때 두 번째 자식이 부모 파이프를 상속하므로 부모가 self.parent_conn.close()을 호출하면 두 번째 자식이 여전히 해당 파이프 파일 설명자가 열려 있으므로 파이프 파일 설명이 열리지 않습니다 첫 번째 병렬 자식 프로세스에서 self.child_conn.recv_bytes()read()EOFEOFError이 절대로 throw되지 않는다는 의미에서 커널에서 닫히지 않습니다 (참조 카운트가 0보다 큽니다).

어떤 프로세스에서 어떤 파일 설명자가 공유되는지 거의 제어 할 수 없으므로 파일 설명자를 닫는 대신 명시 적 종료 메시지를 보내야 할 수도 있습니다 (포크 파일 설명자 플래그가 없음).

+0

감사합니다. 그것은 나를 위해 일을 정리했다. 필자는 모든 인스턴스에서 열려있는 모든 연결을 포함하는 공유 클래스 변수를 사용하여 예제에서이를 해결 했으므로 필요없는 모든 소켓을 닫을 수 있습니다. –

관련 문제