2017-05-18 1 views
0

을 얻는 방법을 나는 셀러리 비트를 사용하여 주기적 작업을 설정합니다. 작업이 실행되고 콘솔에서 결과를 볼 수 있습니다. 작업에 의해 던져진 결과를 다시 생각하는 파이썬 스크립트를 갖고 싶습니다.셀러리 작업 ID

이 같은 그것을 할 수 :

#client.py 
from cfg_celery import app 
task_id = '337fef7e-68a6-47b3-a16f-1015be50b0bc' 
try: 
    x = app.AsyncResult(id) 
    print(x.get()) 
except: 
    print('some error') 

을 어쨌든, 당신이 볼 수 있듯이,이 테스트를 위해 나는 셀러리 비트 콘솔 (그렇게 말하는)에 던져 task_id을 복사하고 그것을 하드 코딩했다 내 스크립트. 분명히 이것은 실제 생산에서 작동하지 않을 것입니다. 나는 이런 식으로 읽을 수

#cfg_celery.py 
app = Celery('celery_config', 
     broker='redis://localhost:6379/0', 
     include=['taskos'], 
     backend = 'redis' 
     ) 
app.conf.beat_schedule = { 
    'something': { 
     'task': 'tasks.add', 
     'schedule': 10.0, 
     'args': (16, 54), 
     'options' : {'task_id':"my_custom_id"}, 
    } 
} 

이 방법 :

#client.py 
from cfg_celery import app 
task_id = 'my_custom_id' 
try: 
    x = app.AsyncResult(id) 
    print(x.get()) 
except: 
    print('some error') 

이 방법의 문제는 내가 이전을 잃게

나는 그것이 셀러리 설정 파일에 task_id 설정 해킹 결과 (client.py 전화 이전).

나는 셀러리 백엔드에서 task_id 's의 목록을 읽을 수있는 몇 가지 방법이 있나요? 둘 이상의 주기적 작업이있는 경우 각각의 주기적 작업에서 task_id의 목록을 얻을 수 있습니까? 이렇게하려면 app.tasks.key()을 사용할 수 있습니까?

PD : 내가 잘못 몇 가지 용어를 사용하는 경우 - 영어 - 네이티브 플러스 셀러리에 새로운 것이 아니다, 좋은.

+0

나는 (내가 다른 파이썬 인스턴스로부터 결과를 얻고 싶다) 나는 값을 저장하고 검색하기 위해 redis 함수를 사용해야한다는 결론에 도달했다. 내가 zadd를 사용하여 결과를 redis에 쓰고 'client.py'에 zrange를 사용하여 검색합니다. –

답변

0

확인을 클릭합니다. 어려운 일이거나 내 질문이 너무 바보이기 때문에 누구도이 질문에 답하지 못했습니다. 어쨌든, 내가하고 싶었던 것은 다른 파이썬 프로세스에서 나온 '셀러리 - 비트'작업의 결과를 얻는 것입니다. 동일한 프로세스에 있었기 때문에 아무 문제없이 작업 ID에 액세스 할 수 있었으며 모든 것이 쉽게 수행되었습니다. 그러나 다른 프로세스에서 완성 된 작업의 목록을 검색하는 방법을 찾지 못했습니다.

나는 파이썬 RQ 시도 (이 좋은)하지만 난 RQ를 사용하는 것을보고 나도 내가 수동으로 레디 스 저장 기능을 사용했습니다 것을 이해하는 것이 왔다고 할 수 없었다.

: 그래서 나는이 일을, 내가 원하는 것을 얻었다. 'bind = True'를 사용하면 태스크 함수 내에서 스 누울 수 있습니다. . 함수의 결과를 얻은 후에는 redis로 목록에 씁니다 (이 목록의 크기를 제한하기 위해 트릭을했습니다) . 이제 독립 프로세스에서 동일한 redis 서버에 연결하여 해당 목록에 저장된 결과를 검색 할 수 있습니다.

cfg_celery.py :

내 파일은 다음과 같이 었죠 여기에 내가이 작업을 호출하려고하는 방법을 정의합니다.

#cfg_celery.py 
from celery import Celery 

appo = Celery('celery_config', 
     broker='redis://localhost:6379/0', 
     include=['taskos'], 
     backend = 'redis' 
     ) 

''' 
urlea se decoro como periodic_task. no hay necesidad de darla de alta aqi. 
pero como add necesita args, la doy de alta manualmente p pasarselos 
''' 
appo.conf.beat_schedule = { 
    'q_loco': { 
     'task': 'taskos.add', 
     'schedule': 10.0, 
     'args': (16, 54), 
     # 'options' : {'task_id':"lcura"}, 
    } 
} 

taskos.py : 이것들이 작업입니다.

#taskos.py 
from cfg_celery import appo 
from celery.decorators import periodic_task 
from redis import Redis 

from datetime import timedelta 
import requests, time 

rds = Redis() 

@appo.task(bind=True) 
def add(self,a, b): 
    #result of operation. very dummy. 
    result = a + b 

    #storing in redis 
    r= (self.request.id,time.time(),result) 
    rds.lpush('my_results',r) 

    # for this test i want to have at most 5 results stored in redis 
    long = rds.llen('my_results') 
    while long > 5: 
     x = rds.rpop('my_results') 
     print('popping out',x) 
     long = rds.llen('my_results') 
     time.sleep(1) 
    return a + b 


@periodic_task(run_every=20) 
def urlea(url='https://www.fullstackpython.com/'): 
    inicio = time.time() 
    R = dict() 
    try: 
     resp = requests.get(url) 
     R['vato'] = url+" = " + str(resp.status_code*10) 
     R['num palabras'] = len(resp.text.split()) 
    except: 
     R['vato'] = None 
     R['num palabras'] = 0   
    print('u {} : {}'.format(url,time.time()-inicio)) 
    time.sleep(0.8) # truco pq se vea mas claramente la dif. 
    return R 

consumer.py : 결과를 얻을 수있는 독립 프로세스.

#consumer.py 
from redis import Redis 
nombre_lista = 'my_results' 

rds = Redis() 

tamaño = rds.llen(nombre_lista) 
ultimos_resultados = list() 
for i in range(tamaño): 
    ultimos_resultados.append(rds.rpop(nombre_lista)) 

print(ultimos_resultados) 

나는 프로그래밍 비교적 새로운 오전 나는이 대답은 나 같은 noobs에 도움을 줄 수 있기를 바랍니다. 내가 잘못한 것이 있으면 필요에 따라 수정을하십시오.