Mam hosting Flask bez dostępu do cron
poleceń.
Jak mogę co godzinę wykonywać jakąś funkcję Pythona?
Można skorzystać BackgroundScheduler()
z APScheduler opakowania (v3.5.3):
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
def print_date_time():
print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))
scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())
Zauważ, że dwa z tych programów planujących zostaną uruchomione, gdy Flask będzie w trybie debugowania. Aby uzyskać więcej informacji, sprawdź to pytanie.
flask
miałApp.runonce
lubApp.runForNseconds
mógłbyś przełączać się międzyschedule
aMożesz wykorzystać
APScheduler
w swojej aplikacji Flask i uruchamiać zadania za pośrednictwem jego interfejsu:import atexit # v2.x version - see https://stackoverflow.com/a/38501429/135978 # for the 3.x version from apscheduler.scheduler import Scheduler from flask import Flask app = Flask(__name__) cron = Scheduler(daemon=True) # Explicitly kick off the background thread cron.start() @cron.interval_schedule(hours=1) def job_function(): # Do your work here # Shutdown your cron thread if the web process is stopped atexit.register(lambda: cron.shutdown(wait=False)) if __name__ == '__main__': app.run()
źródło
lambda
zasięguatexit.register
?atexit.register
potrzebuje funkcji do wywołania. Gdybyśmy właśnie zdalicron.shutdown(wait=False)
, przekazalibyśmy wynik wywołaniacron.shutdown
(co jest prawdopodobnieNone
). Zamiast tego przekazujemy funkcję o zerowym argumencie i zamiast nadawać jej nazwę i używać instrukcji,def shutdown(): cron.shutdown(wait=False)
aatexit.register(shutdown)
zamiast tego rejestrujemy ją w liniilambda:
(co jest wyrażeniem funkcji o zerowym argumencie ).Jestem trochę nowy w koncepcji harmonogramów aplikacji, ale to, co znalazłem tutaj dla APScheduler v3.3.1 , jest trochę inne. Uważam, że w najnowszych wersjach zmieniła się struktura pakietów, nazwy klas itp., Dlatego zamieszczam tutaj świeże rozwiązanie, które wykonałem niedawno, zintegrowane z podstawową aplikacją Flaska:
#!/usr/bin/python3 """ Demonstrating Flask, using APScheduler. """ from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask def sensor(): """ Function for test purposes. """ print("Scheduler is alive!") sched = BackgroundScheduler(daemon=True) sched.add_job(sensor,'interval',minutes=60) sched.start() app = Flask(__name__) @app.route("/home") def home(): """ Function for test purposes. """ return "Welcome Home :) !" if __name__ == "__main__": app.run()
Zostawiam tutaj również streszczenie , jeśli ktoś jest zainteresowany aktualizacjami dla tego przykładu.
Oto kilka odniesień do przyszłych lektur:
źródło
apscheduler.schedulers.background
, ponieważ może się zdarzyć, że napotkasz inne przydatne scenariusze dla Twojej aplikacji. Pozdrowienia.Możesz spróbować użyć BackgroundScheduler APScheduler, aby zintegrować zadanie interwałowe z aplikacją Flask. Poniżej znajduje się przykład wykorzystujący blueprint i fabrykę aplikacji ( init .py):
from datetime import datetime # import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask from webapp.models.main import db from webapp.controllers.main import main_blueprint # define the job def hello_job(): print('Hello Job! The time is: %s' % datetime.now()) def create_app(object_name): app = Flask(__name__) app.config.from_object(object_name) db.init_app(app) app.register_blueprint(main_blueprint) # init BackgroundScheduler job scheduler = BackgroundScheduler() # in your case you could change seconds to hours scheduler.add_job(hello_job, trigger='interval', seconds=3) scheduler.start() try: # To keep the main thread alive return app except: # shutdown if app occurs except scheduler.shutdown()
Mam nadzieję, że to pomoże :)
Odniesienie:
źródło
Aby uzyskać proste rozwiązanie, możesz dodać trasę, taką jak
@app.route("/cron/do_the_thing", methods=['POST']) def do_the_thing(): logging.info("Did the thing") return "OK", 200
Następnie dodaj zadanie unix cron, które okresowo POSTuje na tym punkcie końcowym. Na przykład, aby uruchamiać go raz na minutę, w typie terminala
crontab -e
i dodać tę linię:(Zwróć uwagę, że ścieżka do zwijania musi być kompletna, ponieważ po uruchomieniu zadania nie będzie mieć Twojej ŚCIEŻKI. Pełną ścieżkę do zwijania w systemie można znaleźć, klikając
which curl
)Podoba mi się to, że łatwo jest przetestować zadanie ręcznie, nie ma dodatkowych zależności, a ponieważ nie dzieje się nic specjalnego, łatwo jest to zrozumieć.
Bezpieczeństwo
Jeśli chcesz zabezpieczyć hasłem swoje zadanie cron, możesz
pip install Flask-BasicAuth
, a następnie dodać poświadczenia do konfiguracji aplikacji:app = Flask(__name__) app.config['BASIC_AUTH_REALM'] = 'realm' app.config['BASIC_AUTH_USERNAME'] = 'falken' app.config['BASIC_AUTH_PASSWORD'] = 'joshua'
Aby zabezpieczyć hasłem punkt końcowy zadania:
from flask_basicauth import BasicAuth basic_auth = BasicAuth(app) @app.route("/cron/do_the_thing", methods=['POST']) @basic_auth.required def do_the_thing(): logging.info("Did the thing a bit more securely") return "OK", 200
Następnie, aby zadzwonić do niego z pracy crona:
źródło
Inną alternatywą może być użycie Flask-APScheduler, który dobrze współgra z Flaskiem, np .:
Więcej informacji tutaj:
https://pypi.python.org/pypi/Flask-APScheduler
źródło
Kompletny przykład wykorzystujący harmonogram i przetwarzanie wieloprocesowe, ze sterowaniem włączaniem i wyłączaniem oraz parametrem funkcji run_job (), kody powrotne są uproszczone, a interwał jest ustawiony na 10 sekund, a następnie
every(2).hour.do()
na 2 godziny. Harmonogram jest imponujący, nie dryfuje i nigdy nie widziałem, aby podczas planowania był oddalony o więcej niż 100 ms. Używanie wieloprocesorowości zamiast wątkowania, ponieważ ma metodę kończenia.#!/usr/bin/env python3 import schedule import time import datetime import uuid from flask import Flask, request from multiprocessing import Process app = Flask(__name__) t = None job_timer = None def run_job(id): """ sample job with parameter """ global job_timer print("timer job id={}".format(id)) print("timer: {:.4f}sec".format(time.time() - job_timer)) job_timer = time.time() def run_schedule(): """ infinite loop for schedule """ global job_timer job_timer = time.time() while 1: schedule.run_pending() time.sleep(1) @app.route('/timer/<string:status>') def mytimer(status, nsec=10): global t, job_timer if status=='on' and not t: schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4())) t = Process(target=run_schedule) t.start() return "timer on with interval:{}sec\n".format(nsec) elif status=='off' and t: if t: t.terminate() t = None schedule.clear() return "timer off\n" return "timer status not changed\n" if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)
Testujesz to, wydając po prostu:
$ curl http://127.0.0.1:5000/timer/on timer on with interval:10sec $ curl http://127.0.0.1:5000/timer/on timer status not changed $ curl http://127.0.0.1:5000/timer/off timer off $ curl http://127.0.0.1:5000/timer/off timer status not changed
Co 10 sekund licznik czasu jest włączony, wysyła komunikat licznika do konsoli:
127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 - timer job id=b64ed165-911f-4b47-beed-0d023ead0a33 timer: 10.0117sec timer job id=b64ed165-911f-4b47-beed-0d023ead0a33 timer: 10.0102sec
źródło
Możesz chcieć użyć jakiegoś mechanizmu kolejkowania z harmonogramem, takim jak planista RQ lub czymś cięższym, jak Seler (najprawdopodobniej przesada).
źródło