2014-07-23 1 views
2

특정 group_id에 해당하는 호스트 목록을 가져 오는 python 스크립트를 작성하고 있습니다. 같은 것을 가져 오기 위해 웹 서비스 호출을 사용할 것이다. 호스트의 수는 10,000 개가 될 수 있습니다. 이제 각 호스트에 대해 다른 웹 서비스에서 property라는 값을 가져옵니다.
그래서 그룹 ID는 ---- (WS1)은 ----- 호스트 젯 10000s - 나는 concurrent.futures을 사용하고 각Python : 웹 서비스에서 데이터를 가져 오기위한 병렬 비동기 호출의 효율성 문제

에 대한 (WS2) ---- 속성은 다음 코드와 같이 . 그러나 그것은 깨끗한 디자인으로 보이지 않으며 잘 확장되지 않을 것입니다.

def call_ws_1(group_id): 
    #fetch list of hosts for group_id 


def call_ws_2(host): 
    #find property for host 


def fetch_hosts(group_ids): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids} 
     for future in concurrent.futures.as_completed(future_to_grp_id): 
      group_id = future_to_grp_id[future] 
      try: 
       hosts = future.result()#this is a list 
      except Exception as exp: 
       #logging etc 
      else: 
       fetch_property(hosts) 


def fetch_property(hosts): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_host = {executor.submit(call_ws_2, host): host for host in hosts} 
     for future in concurrent.futures.as_completed(future_to_host): 
      host = future_to_host[future] 
      try: 
       host_prop = future.result()#String 
      except Exception as exp: 
       #logging etc 
      else: 
       #Save host and property to DB 
  1. 는 ProcessPoolExecuter를 사용하는 이점이있을 것인가?
  2. 먼저 모든 호스트를 가져 와서 (약 40000 개) ws를 호출하여 속성을 가져 오는 방법
  3. 이 디자인을 개선하기위한 다른 제안이 있으십니까?

답변

2
  1. ProcessPoolExecutor GIL은 영향을받지 않는 장점을 갖는다. ThreadPoolExecutor을 사용하면 I/O를 수행하지 않는 한 GIL이 한 번에 둘 이상의 스레드를 실행하지 못하게합니다. 다행스럽게도 두 스레드가 주로 I/O를 수행하는 것처럼 보이지만 웹 서비스 호출 전후에 각 스레드에서 발생하는 모든 종류의 처리가 실제로 동시에 발생하지 않으므로 성능이 저하됩니다. ProcessPoolExecutor에는 이러한 제한이 없지만 프로세스간에 데이터가 group_idhost 인 추가 오버 헤드가 있습니다. 수만개의 호스트가있는 경우 프로세스간에 이들 호스트를 한 번에 보내는 것은 상당한 오버 헤드가 발생할 것입니다.

  2. 이 변경 사항만으로도 성능이 너무 많이 변하지 않을 것이라고 생각합니다. 결국에는 각 호스트를 한 번에 하나씩 처리 스레드로 보내려고하기 때문입니다.

숫자 3은 작업자 스레드가 실제로 I/O를 거의 수행하지 않는 경우이 방법이 정상적으로 작동 할 수 있습니다. 그러나 스레드를 사용하면 작업자가 수행하는 CPU 관련 작업이 모두 성능을 저하시킬 수 있습니다. 나는 정확한 프로그램 레이아웃을 가져다가이처럼 두 노동자를 구현 :

def call_ws_1(group_id): 
    return list(range(20)) 

def call_ws_2(host): 
    sum(range(33000000)) # CPU-bound 
    #time.sleep(1) # I/O-bound 
    return "{} property".format(host) 

그리고 이런 모든 것을 실행 :

if __name__ == "__main__": 
    start = time.time() 
    fetch_hosts(['a', 'b', 'c', 'd', 'e']) 
    end = time.time() 
    print("Total time: {}".format(end-start)) 

time.sleep 사용을, 출력은 다음과 같습니다

Fetching hosts for d 
Fetching hosts for a 
Fetching hosts for c 
Fetching hosts for b 
Fetching hosts for e 
Total time: 25.051292896270752 

은 Using sum(range(33000000)) 계산은 성능이 훨씬 나쁩니다.

계산이 내 노트북에 약 1 초 걸립니다

참고 :

>>> timeit.timeit("sum(range(33000000))", number=1) 
1.023313045501709 
>>> timeit.timeit("sum(range(33000000))", number=1) 
1.029937982559204 

그래서 각 작업자는 초 정도 소요됩니다. 하지만 하나가 CPU 바인딩되어 있고 GIL의 영향을 받기 때문에 스레드가 끔찍하게 수행됩니다.

여기 ProcessPoolExecutortime.sleep을 사용하고 :

Fetching hosts for a 
Fetching hosts for b 
Fetching hosts for c 
Fetching hosts for d 
Fetching hosts for e 
Total time: 25.169482469558716 

지금 sum(range(33000000))를 사용하여 :

Fetching hosts for a 
Fetching hosts for b 
Fetching hosts for c 
Fetching hosts for d 
Fetching hosts for e 
Total time: 43.54587936401367 

당신이 볼 수 있듯이, 성능 계산이보다 더 조금 걸립니다 아마 때문에 time.sleep (보다 여전히 더 동안 두 번째 및 CPU 바인딩 작업은 랩톱에서 실행되는 다른 모든 제품과 경쟁해야합니다.), 여전히 스레드 버전보다 훨씬 뛰어납니다.

그러나 호스트의 수가 올라감에 따라 IPC의 비용이 상당히 줄어들 것으로 예상됩니다. 여기에 ThreadPoolExecutor 10000 호스트와 않지만, 아무것도하지 않는 작업자 프로세스 (그냥 반환) 방법은 다음과 같습니다

Fetching hosts for c 
Fetching hosts for b 
Fetching hosts for d 
Fetching hosts for a 
Fetching hosts for e 
Total time: 9.535644769668579 

ProcessPoolExecutor에 비교 :

Fetching hosts for c 
Fetching hosts for b 
Fetching hosts for a 
Fetching hosts for d 
Fetching hosts for e 
Total time: 36.59257411956787 

그래서 ProcessPoolExecutor과의 4 배 느린있다, 모든 발생 IPC의 비용으로

그럼이게 무슨 뜻입니까? 여러분의 최상의 성능은 ProcessPoolExecutor을 사용하지만 IPC를 추가적으로 배치함으로써 한 번에 하나의 호스트를 보내는 것이 아니라 많은 양의 호스트를 자식 프로세스로 보내도록하는 것입니다.

(테스트되지 않은,하지만 당신에게 아이디어를 제공합니다) 이런 식으로 뭔가 : 당신이 제안

import time 
import itertools 
import concurrent.futures 
from concurrent.futures import ProcessPoolExecutor as Pool 

def call_ws_1(group_id): 
    return list(range(10000)) 

def call_ws_2(hosts): # This worker now works on a list of hosts 
    host_results = [] 
    for host in hosts: 
     host_results.append((host, "{} property".format(host))) # returns a list of (host, property) tuples 
    return host_results 

def chunk_list(l): 
    chunksize = len(l) // 16 # Break the list into smaller pieces 
    it = [iter(l)] * chunksize 
    for item in itertools.zip_longest(*it): 
     yield tuple(filter(None, item)) 

def fetch_property(hosts): 
    with Pool(max_workers=4) as executor: 
     futs = [] 
     for chunk in chunk_list(hosts): 
      futs.append(concurrent.futures.submit(call_ws_2, chunk)) 
     for future in concurrent.futures.as_completed(futs): 
      try: 
       results = future.result() 
      except Exception as exp: 
       print("Got %s" % exp) 
      else: 
       for result in results: 
        host, property = result 
        # Save host and property to DB 

def fetch_hosts(group_ids): 
    with Pool(max_workers=4) as executor: 
     future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids} 
     for future in concurrent.futures.as_completed(future_to_grp_id): 
      group_id = future_to_grp_id[future] 
      try: 
       hosts = future.result()#this is a list 
      except Exception as exp: 
       print("Got %s" % exp) 
      else: 
       print("Fetching hosts for {}".format(group_id)) 
       fetch_property(hosts) 

if __name__ == "__main__": 
    start = time.time() 
    fetch_hosts(['a', 'b', 'c', 'd', 'e']) 
    end = time.time() 
    print("Total time: {}".format(end-start)) 
+0

이 솔루션은 매우 유용한 가이드 라인이었다. 나는 여전히 실험을하고 약간의 손잡이를 미세하게 조정하고 있지만 아주 좋은 출발을 가져 주셔서 감사합니다. –

+0

미래에 대한 또 다른 기본적인 질문이 있습니다. 'fetch_property (hosts)'를 호출하는'else' 부분에서'fetch_hosts()'메소드에 대한 코드를 참조하십시오. 이걸 어떻게 부르는거야? 그것은 차단 호출 또는 비동기입니다. 호스트가 페치되면 그들은이 메쏘드에 전달 될 것이고 다음 호스트가 새로운 호스트 집합을 반환 할지라도 이전에'fetch_property'를 호출하지 않는 한 차단 될 것입니다. 또는 여러개의 비동기 호출이'fetch_property'에 만들어집니다 언제든지 호스트를 사용할 수 있습니다. –

관련 문제