2014-10-02 3 views
1

작업 할 큰 목록 L이 있습니다. f()는 L에서 작동하는 함수입니다. f()는 15 분마다 만료되고 갱신되어야하는 다른 변수를 사용합니다. 여기 직렬, 예입니다타이머 처리 대상 다중 처리

F 이후
def main(): 
    L = openList() 
    # START THE CLOCK 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 
    for item in L: 
     f(item, a) # operate on item given a 
     # CHECK TIME REMAINING 
     clockCur = dt.datetime.now() 
     clockRem = (clockExp - clockCur).total_seconds() 
     # RENEW a IF NEEDED 
     if clockRem < 5: # renew with 5 seconds left 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
      a = getRenewed() 

()는 몇 초 정도 걸립니다 (이상 가끔), 나는 코드를 병렬화하고 싶습니다. 주어진 타이머를 수행하는 방법에 대한 팁? clockExp와 "a"를 공유하는 것을 상상해 보았습니다. 프로세스가 clockRem < 5를 만족하면 getRenewed()를 호출하고 새로운 "a"와 clockExp를 공유하고 반복합니다.

+0

'f' 또는'getRenewed'는 프로세스 특정 상태에 의존합니까? 아니면 외부 상태에만 의존합니까? – Blckknght

+0

f. L. getRenewed의 항목에 따라 웹 사이트를 다운로드하면 인증 토큰을 얻습니다. – Kevin

답변

3

부작용없이 여러 번 호출 할 수있는 경우 (즉, 부작용없이 여러 번 호출 할 수 있음) 기존 타이머 코드를 작업자 프로세스로 옮기고 자신의 타이머가 있음을 알 때마다 한 번 호출하도록 할 수 있습니다 달려라. 이것은 단지 당신이 통과 목록에서 항목에 대한 동기화를 필요로하고, multiprocessing.Pool는 충분히 쉽게 처리 할 수 ​​있습니다 : 멱등되지

def setup_worker(): 
    global clockExp, a 

    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 

def worker(item): 
    global clockExp, a 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 

    if clockRem < 5: # renew with 5 seconds left 
     clockStart = dt.datetime.now() 
     clockExp = clockStart + dt.timedelta(seconds=900) 
     a = getRenewed() 

    f(item, a) 

def main(L): 
    pool = multiprocessing.Pool(initializer=setup_worker) 

    pool.map(worker, L) 

getRenewed 경우 상황이 조금 더 복잡 할 필요가있을 것이다. 각 작업자 프로세스에서 호출 할 수 없으므로 프로세스간에 통신 방법을 설정해야 사용할 수있는 최신 버전을 얻을 수 있습니다.

multiprocessing.queue을 사용하여 주 프로세스의 값 a을 작업자에게 전달하는 것이 좋습니다. 목록 항목에 여전히 Pool을 사용할 수 있습니다. 주 프로세스에서 비동기 적으로 사용해야합니다. 아마도이 같은 :

노동자는 여전히 하나의 작업자가 하나의 대기열을 보낸 a 값이 소비 수있는 위치 그렇지 않으면 당신은 경쟁 조건에 직면 것 때문에, 어떤 타이밍 코드를 가지고 있어야합니다
def setup_worker2(queue): 
    global x 
    x = random.random() 
    global a_queue, a, clockExp 

    a_queue = queue 
    a = a_queue.get() # wait for the first `a` value 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 

def worker2(item): 
    global a, clockExp 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 
    if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed 
     try: 
      a = a_queue.get_nowait() 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
     except queue.Empty: 
      pass 

    return f(item, a) 

def main2(L): 
    queue = multiprocessing.Queue()  # setup the queue for the a values 

    pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,)) 

    result = pool.map_async(worker2, L) # send the items to the pool asynchronously 

    while True:     # loop for sending a values through the queue 
     a = getRenewed()   # get a new item 
     for _ in range(os.cpu_count()): 
      queue.put(a)   # send one copy per worker process 

     try: 
      result.wait(900-5) # sleep for ~15 minutes, or until the result is ready 
     except multiprocessing.TimeoutError: 
      pass     # if we got a timeout, keep looping! 
     else: 
      break     # if not, we are done, so break out of the loop! 

메인 프로세스에서 배치. f에 대한 호출 중 일부가 다른 것보다 현저히 느린 경우 (웹에서 다운로드하는 것과 관련이있을 수 있음) 발생할 수 있습니다.

+0

팁 주셔서 감사. 여러 개의 활성 토큰을 가질 수 있으므로 첫 번째 솔루션이 작동합니다. – Kevin