2017-04-11 8 views
0

DB에서 행 당 하나씩 수백만 개의 셀러리 작업을 생성하는 스크립트가 있습니다. 셀러리를 완전히 범람하지 못하게 막을 수있는 방법이 있습니까?셀러리 작업을 소비하는 것보다 빠르게 만드는 스크립트를 조절하는 방법은 무엇입니까?

이상적으로 저는 셀러리를 바쁘게하고 싶습니다만, 셀러리 대기열의 길이가 단지 수십 가지 작업을 초과하기를 원하지 않습니다. (특히 메모리를 낭비하는 것이므로 스크립트가 수백만 거의 즉시 작업 대기열로).

답변

0

지난 며칠 동안이 문제에 약간의 시간을 할애하여 CeleryThrottle 개체를 생각해 냈습니다. 기본적으로 큐에 넣고 자하는 아이템의 수를 알려주고 그 크기와 그 크기 사이에 큐를 유지하는 것이 최선입니다.

그래서 여기에 코드입니다 (레디 스 브로커를 가정하지만, 쉽게 변경) :

throttle = CeleryThrottle() 
for item in really_big_list_of_items: 
    throttle.maybe_wait() 
    my_task.delay(item) 

아주 간단하고 잘하면 꽤 유연 :

# coding=utf-8 
from collections import deque 

import time 

import redis 
from django.conf import settings 
from django.utils.timezone import now 


def get_queue_length(queue_name='celery'): 
    """Get the number of tasks in a celery queue. 

    :param queue_name: The name of the queue you want to inspect. 
    :return: the number of items in the queue. 
    """ 
    r = redis.StrictRedis(
     host=settings.REDIS_HOST, 
     port=settings.REDIS_PORT, 
     db=settings.REDIS_DATABASES['CELERY'], 
    ) 
    return r.llen(queue_name) 


class CeleryThrottle(object): 
    """A class for throttling celery.""" 

    def __init__(self, min_items=100, queue_name='celery'): 
     """Create a throttle to prevent celery run aways. 

     :param min_items: The minimum number of items that should be enqueued. 
     A maximum of 2× this number may be created. This minimum value is not 
     guaranteed and so a number slightly higher than your max concurrency 
     should be used. Note that this number includes all tasks unless you use 
     a specific queue for your processing. 
     """ 
     self.min = min_items 
     self.max = self.min * 2 

     # Variables used to track the queue and wait-rate 
     self.last_processed_count = 0 
     self.count_to_do = self.max 
     self.last_measurement = None 
     self.first_run = True 

     # Use a fixed-length queue to hold last N rates 
     self.rates = deque(maxlen=15) 
     self.avg_rate = self._calculate_avg() 

     # For inspections 
     self.queue_name = queue_name 

    def _calculate_avg(self): 
     return float(sum(self.rates))/(len(self.rates) or 1) 

    def _add_latest_rate(self): 
     """Calculate the rate that the queue is processing items.""" 
     right_now = now() 
     elapsed_seconds = (right_now - self.last_measurement).total_seconds() 
     self.rates.append(self.last_processed_count/elapsed_seconds) 
     self.last_measurement = right_now 
     self.last_processed_count = 0 
     self.avg_rate = self._calculate_avg() 

    def maybe_wait(self): 
     """Stall the calling function or let it proceed, depending on the queue. 

     The idea here is to check the length of the queue as infrequently as 
     possible while keeping the number of items in the queue as closely 
     between self.min and self.max as possible. 

     We do this by immediately enqueueing self.max items. After that, we 
     monitor the queue to determine how quickly it is processing items. Using 
     that rate we wait an appropriate amount of time or immediately press on. 
     """ 
     self.last_processed_count += 1 
     if self.count_to_do > 0: 
      # Do not wait. Allow process to continue. 
      if self.first_run: 
       self.first_run = False 
       self.last_measurement = now() 
      self.count_to_do -= 1 
      return 

     self._add_latest_rate() 
     task_count = get_queue_length(self.queue_name) 
     if task_count > self.min: 
      # Estimate how long the surplus will take to complete and wait that 
      # long + 5% to ensure we're below self.min on next iteration. 
      surplus_task_count = task_count - self.min 
      wait_time = (surplus_task_count/self.avg_rate) * 1.05 
      time.sleep(wait_time) 

      # Assume we're below self.min due to waiting; max out the queue. 
      if task_count < self.max: 
       self.count_to_do = self.max - self.min 
      return 

     elif task_count <= self.min: 
      # Add more items. 
      self.count_to_do = self.max - task_count 
      return 

사용 방법처럼 보인다. 이를 통해 코드는 큐를 모니터링하고 큐가 너무 길어지면 루프에 대기를 추가합니다. 업데이트가있는 경우에 대비하여 our github repo에 있습니다.

이렇게하면 작업의 롤링 평균 속도가 추적되고 필요 이상으로 큐 길이를 확인하지 않습니다. 예를 들어 작업을 실행하는 데 2 ​​분이 걸리는 경우 대기열에 100 개의 항목을 넣은 후에 대기열 길이를 다시 확인하지 않아도되기까지 꽤 오래 기다릴 수 있습니다. 이 스크립트의 더 단순한 버전은 매번 루프를 통해 대기열 길이를 확인할 수 있지만 불필요한 지연을 추가합니다. 이 버전은 때로는 잘못 (이 경우 대기열이 min_items 아래로가는 경우)의 대가를 치르면서 그것에 대해 현명하려고합니다.

관련 문제