여러 프로세스를 사용하여 체크섬을 계산하여 여러 코어를 활용하는 클래스를 작성하려고합니다. 나는 이것을 위해 아주 간단한 클래스를 가지고 있으며, 간단한 경우를 실행할 때 훌륭하게 작동한다. 그러나 두 개 이상의 클래스 인스턴스를 만들 때마다 작업자는 종료되지 않습니다. 파이프가 부모에 의해 닫혔다는 메시지를받지 못하는 것 같습니다.파이썬 다중 처리 파이프 사용
모든 코드는 아래에서 찾을 수 있습니다. 먼저 작동하는 md5와 sha1 체크섬을 개별적으로 계산 한 다음 병렬로 계산을 시도한 다음 파이프를 닫을 때 프로그램을 잠급니다.
여기에 무슨 일이 일어나고 있습니까? 내가 예상 한대로 파이프가 작동하지 않는 이유는 무엇입니까? 큐에 "Stop"메시지를 보내어 아이가 그런 식으로 종료하게하는 방법으로 해결할 수 있다고 생각합니다. 그러나 이것이 왜 그렇게 작동하지 않는지 정말 알고 싶습니다.
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
PS. 사람이 관심이 있다면이 문제는 위의 코드의 작업 버전을 여기을 해결되었습니다
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
'self.parent_conn.close()'다음에'ChecksumPipe.all_open_parent_conns.remove (self.parent_conn)'를 추가하여 연결을 끊을 수 있습니다. –
'self.summer = eval ("hashlib. % s()"% csname)'이보기 흉합니다. 'self.summer = getattr (hashlib, csname)()은 어떨까요? – glglgl