2017-01-30 1 views
4

다른 API 목록을 반환하는 API가 있습니다.다른 셀러 리 작업이 작동하지 않는 셀러리 정기 작업

15 분마다 이러한 API에 액세스하여 데이터를 데이터베이스에 반환해야합니다.

다음은 셀러리 및 redis를 사용하여 celery_worker.py 파일에 기록한 내용입니다. 그러나 모든 작업이 시작되지 않습니다.

list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json() 

CELERYBEAT_SCHEDULE = { 
    'every-15-minute': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': timedelta(minutes=15), 
    }, 
} 

@celery.task 
def access_one_API(one_API): 
    return requests.get(one_API).json() 

@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(): 
    for one_API in list_of_APIs: 
      task = access_one_API.delay(one_API) 
      # some codes to put all task.id into a list_of_task_id 

    for task_id in list_of_task_id: 
      # some codes to get the results of all tasks 
      # some codes to put all the results into a database 

fetch_data_of_all_APIs 기능은 access_one_API 기능

셀러리 서버가 성공적으로 터미널에서 시작하지만 어느 fetch_data_of_all_APIsaccess_one_API 시작을 실행하는 다수의 근로자를 사용하도록되어 15 분마다 실행해야합니다.

fetch_data_of_all_APIs 함수 내에서 코드를 추출하면 access_one_API이 여러 셀러리 작업자에 의해 시작되고 실행될 수 있습니다. 그러나 이러한 코드를 함수 내에 넣고 @celery.task으로 꾸미면 두 함수가 모두 시작되지 않습니다.

그래서 나는 그것이 셀러리와 관련이 있어야한다고 생각합니다.

미리 감사드립니다.

+0

'@ celery.task()'데코레이터가 필요합니다. 또한, 현재 샐러리 버전이 소문자로 설정되어 있으므로'celery-beat' 설정 매개 변수를 확인해야합니다. –

답변

0

여기 예제에서는 서브 태스크를 사용하여주기 태스크를 셀리에서 구성하는 방법을 설명합니다 (데모 용으로 20 초 설정). tasks.py :

import celery 
from celery.canvas import subtask 
from celery.result import AsyncResult 
# just for example list of integer values 
list_of_APIs = [1, 2, 3, 4] 


@celery.task(name='access_one_API') 
def access_one_API(api): 
    """ 
    Sum of subtask for demonstration 
    :param int api: 
    :return: int 
    """ 
    return api + api 


@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(list_of_APIs): 
    list_task_ids = [] 

    for api in list_of_APIs: 
     # run of celery subtask and collect id's of subtasks 
     task_id = subtask('access_one_API', args=(api,)).apply_async().id 
     list_task_ids.append(task_id) 

    result_sub_tasks = {} 

    for task_id in list_task_ids: 
     while True: 
      task_result = AsyncResult(task_id) 
      if task_result.status == 'SUCCESS': 
       # if subtask is finish add result and check result of next subtask 
       result_sub_tasks[task_id] = task_result.result 

       break 

    print result_sub_tasks 
    # do something with results of subtasks here... 


app = celery.Celery(
    'tasks', 
    broker='redis://localhost:6379/0', 
    backend='redis://localhost:6379/0', 
) 


app.conf.beat_schedule = { 
    'add-every-20-seconds': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': 20.0, 
     # args for fetch_data_of_all_APIs 
     'args': (list_of_APIs,) 
    }, 
} 

실행 셀러리 : 터미널에서 celery worker -A tasks.app --loglevel=info --beat

추적 :이 도움이

[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2} 

희망.

관련 문제