2012-08-04 2 views
11

k8055 USB 인터페이스 보드에 간단한 웹 API를 제공하기 위해 Flask을 사용했습니다. 꽤 표준 getters와 퍼터, 그리고 플라스크 정말 내 인생을 훨씬 쉽게 만들었습니다.Python에서 Flask로 주기적 작업을 수행하는 방법

그러나 나는 유장이 발생할 때/근처에 상태 변경을 등록 할 수 있기를 원합니다.

예를 들어 보드에 버튼이 연결되어 있으면 특정 포트에 대해 API를 폴링 할 수 있습니다. 그러나 출력이 직접 출력을 반영하도록하려면 누군가가 API에 대해 이야기하고 있는지 여부에 관계없이 이렇게 할 수 있습니다.

while True: 
    board.read() 
    board.digital_outputs = board.digital_inputs 
    board.read() 
    time.sleep(1) 

매초마다 출력이 입력과 일치하도록 업데이트됩니다.

플라스크에서 이런 종류의 작업을 수행 할 수있는 방법이 있습니까? 나는 Twisted에서 비슷한 일을했지만 Flask는이 특별한 애플리케이션이 아직 포기하기에는 너무 편리하다. ...

고마워.

답변

11

간단한 작업을 위해 cron을 사용할 수 있습니다.

작업에 대한 플라스크 뷰를 생성하십시오.

# a separate view for periodic task 
@app.route('/task') 
def task(): 
    board.read() 
    board.digital_outputs = board.digital_inputs 

그런 다음 주기적으로

# cron task to run each minute 
0-59 * * * * run_task.sh 

run_task.sh 내용이 크론보다 한 번 더 자주 분을 실행할 수 없습니다

wget http://localhost/task 

이 경우 해당 URL에서 크론, 다운로드를 사용하여. 당신이 더 높은 주파수를해야하는 경우 (예를 들어, 각 오초 = 분당 12 회), 당신은 다음과 같은 방법으로 내 플라스크 응용 프로그램에 대한

# loop 12 times with a delay 
for i in 1 2 3 4 5 6 7 8 9 10 11 12 
do 
    # download url in background for not to affect delay interval much 
    wget -b http://localhost/task 
    sleep 5s 
done 
-1

아니요 Flask에서 작업이 지원되지 않지만 을 사용하거나 별도의 스레드 (greenlet)에서 함수를 실행할 수 있습니다.

+3

감사합니다. gevent/greenlet 경로를 따라 갔지만 'main'스레드가 루핑 스레드에 응답하지 않는 것으로 나타났습니다. (위의 시간 대신 gevent.sleep 사용) 셀러리의 경우 확실하게 메시지 대기열 서버 구현은 과도합니다. 그래서 '단순한'(tm) 무엇인가? – Bolster

1

에 tun_task.sh에서 작업을 수행해야합니다, 나는 설명 크론 접근 방식을 사용하여 고려 Pashka에 의해 answer, schedule 라이브러리 및 APScheduler이 있습니다.

APScheduler가 단순하고주기적인 작업 실행 목적을 제공하는 것으로 나타 났으므로 APScheduler를 계속 사용했습니다.

예제 코드 : 귀하의 제안에 대한

from flask import Flask, jsonify 

from apscheduler.schedulers.background import BackgroundScheduler 


app = Flask(__name__) 

def test_job(): 
    print('I am working...') 

scheduler = BackgroundScheduler() 
job = scheduler.add_job(test_job, 'interval', minutes=1) 
scheduler.start() 
관련 문제