ProcessPoolExecutor
GIL은 영향을받지 않는 장점을 갖는다. ThreadPoolExecutor
을 사용하면 I/O를 수행하지 않는 한 GIL이 한 번에 둘 이상의 스레드를 실행하지 못하게합니다. 다행스럽게도 두 스레드가 주로 I/O를 수행하는 것처럼 보이지만 웹 서비스 호출 전후에 각 스레드에서 발생하는 모든 종류의 처리가 실제로 동시에 발생하지 않으므로 성능이 저하됩니다. ProcessPoolExecutor
에는 이러한 제한이 없지만 프로세스간에 데이터가 group_id
및 host
인 추가 오버 헤드가 있습니다. 수만개의 호스트가있는 경우 프로세스간에 이들 호스트를 한 번에 보내는 것은 상당한 오버 헤드가 발생할 것입니다.
이 변경 사항만으로도 성능이 너무 많이 변하지 않을 것이라고 생각합니다. 결국에는 각 호스트를 한 번에 하나씩 처리 스레드로 보내려고하기 때문입니다.
숫자 3은 작업자 스레드가 실제로 I/O를 거의 수행하지 않는 경우이 방법이 정상적으로 작동 할 수 있습니다. 그러나 스레드를 사용하면 작업자가 수행하는 CPU 관련 작업이 모두 성능을 저하시킬 수 있습니다. 나는 정확한 프로그램 레이아웃을 가져다가이처럼 두 노동자를 구현 :
def call_ws_1(group_id):
return list(range(20))
def call_ws_2(host):
sum(range(33000000)) # CPU-bound
#time.sleep(1) # I/O-bound
return "{} property".format(host)
그리고 이런 모든 것을 실행 :
if __name__ == "__main__":
start = time.time()
fetch_hosts(['a', 'b', 'c', 'd', 'e'])
end = time.time()
print("Total time: {}".format(end-start))
이 time.sleep
사용을, 출력은 다음과 같습니다
Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 25.051292896270752
은 Using sum(range(33000000))
계산은 성능이 훨씬 나쁩니다.
계산이 내 노트북에 약 1 초 걸립니다
참고 :
>>> timeit.timeit("sum(range(33000000))", number=1)
1.023313045501709
>>> timeit.timeit("sum(range(33000000))", number=1)
1.029937982559204
그래서 각 작업자는 초 정도 소요됩니다. 하지만 하나가 CPU 바인딩되어 있고 GIL의 영향을 받기 때문에 스레드가 끔찍하게 수행됩니다.
여기 ProcessPoolExecutor
가 time.sleep
을 사용하고 :
Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 25.169482469558716
지금 sum(range(33000000))
를 사용하여 :
Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 43.54587936401367
당신이 볼 수 있듯이, 성능 계산이보다 더 조금 걸립니다 아마 때문에 time.sleep
(보다 여전히 더 동안 두 번째 및 CPU 바인딩 작업은 랩톱에서 실행되는 다른 모든 제품과 경쟁해야합니다.), 여전히 스레드 버전보다 훨씬 뛰어납니다.
그러나 호스트의 수가 올라감에 따라 IPC의 비용이 상당히 줄어들 것으로 예상됩니다. 여기에 ThreadPoolExecutor
10000 호스트와 않지만, 아무것도하지 않는 작업자 프로세스 (그냥 반환) 방법은 다음과 같습니다
Fetching hosts for c
Fetching hosts for b
Fetching hosts for d
Fetching hosts for a
Fetching hosts for e
Total time: 9.535644769668579
이 ProcessPoolExecutor
에 비교 :
Fetching hosts for c
Fetching hosts for b
Fetching hosts for a
Fetching hosts for d
Fetching hosts for e
Total time: 36.59257411956787
그래서 ProcessPoolExecutor
과의 4 배 느린있다, 모든 발생 IPC의 비용으로
그럼이게 무슨 뜻입니까? 여러분의 최상의 성능은 ProcessPoolExecutor
을 사용하지만 IPC를 추가적으로 배치함으로써 한 번에 하나의 호스트를 보내는 것이 아니라 많은 양의 호스트를 자식 프로세스로 보내도록하는 것입니다.
(테스트되지 않은,하지만 당신에게 아이디어를 제공합니다) 이런 식으로 뭔가 : 당신이 제안
import time
import itertools
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor as Pool
def call_ws_1(group_id):
return list(range(10000))
def call_ws_2(hosts): # This worker now works on a list of hosts
host_results = []
for host in hosts:
host_results.append((host, "{} property".format(host))) # returns a list of (host, property) tuples
return host_results
def chunk_list(l):
chunksize = len(l) // 16 # Break the list into smaller pieces
it = [iter(l)] * chunksize
for item in itertools.zip_longest(*it):
yield tuple(filter(None, item))
def fetch_property(hosts):
with Pool(max_workers=4) as executor:
futs = []
for chunk in chunk_list(hosts):
futs.append(concurrent.futures.submit(call_ws_2, chunk))
for future in concurrent.futures.as_completed(futs):
try:
results = future.result()
except Exception as exp:
print("Got %s" % exp)
else:
for result in results:
host, property = result
# Save host and property to DB
def fetch_hosts(group_ids):
with Pool(max_workers=4) as executor:
future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
for future in concurrent.futures.as_completed(future_to_grp_id):
group_id = future_to_grp_id[future]
try:
hosts = future.result()#this is a list
except Exception as exp:
print("Got %s" % exp)
else:
print("Fetching hosts for {}".format(group_id))
fetch_property(hosts)
if __name__ == "__main__":
start = time.time()
fetch_hosts(['a', 'b', 'c', 'd', 'e'])
end = time.time()
print("Total time: {}".format(end-start))
이 솔루션은 매우 유용한 가이드 라인이었다. 나는 여전히 실험을하고 약간의 손잡이를 미세하게 조정하고 있지만 아주 좋은 출발을 가져 주셔서 감사합니다. –
미래에 대한 또 다른 기본적인 질문이 있습니다. 'fetch_property (hosts)'를 호출하는'else' 부분에서'fetch_hosts()'메소드에 대한 코드를 참조하십시오. 이걸 어떻게 부르는거야? 그것은 차단 호출 또는 비동기입니다. 호스트가 페치되면 그들은이 메쏘드에 전달 될 것이고 다음 호스트가 새로운 호스트 집합을 반환 할지라도 이전에'fetch_property'를 호출하지 않는 한 차단 될 것입니다. 또는 여러개의 비동기 호출이'fetch_property'에 만들어집니다 언제든지 호스트를 사용할 수 있습니다. –