Piszę aplikację w Flasku, która działa naprawdę dobrze, z wyjątkiem tego, że WSGI
jest synchroniczna i blokująca. Mam w szczególności jedno zadanie, które wywołuje interfejs API innej firmy i to zadanie może zająć kilka minut. Chciałbym wykonać to połączenie (w rzeczywistości jest to seria połączeń) i pozwolić mu działać. podczas gdy kontrola jest zwracana do Flaska.
Mój widok wygląda następująco:
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)
Teraz chcę mieć linię
final_file = audio_class.render_audio()
uruchomić i dostarczyć wywołanie zwrotne do wykonania, gdy metoda zwróci, podczas gdy Flask może kontynuować przetwarzanie żądań. Jest to jedyne zadanie, które Flask musi wykonywać asynchronicznie i chciałbym uzyskać porady, jak najlepiej to zaimplementować.
Patrzyłem na Twisted i Klein, ale nie jestem pewien, czy są przesadą, ponieważ może wystarczyłby Threading. A może seler jest do tego dobrym wyborem?
źródło
Odpowiedzi:
Użyłbym Seler obsłużyć asynchroniczne zadanie dla Ciebie. Musisz zainstalować brokera, który będzie służył jako kolejka zadań (zalecane są RabbitMQ i Redis).
app.py
:from flask import Flask from celery import Celery broker_url = 'amqp://guest@localhost' # Broker URL for RabbitMQ task queue app = Flask(__name__) celery = Celery(app.name, broker=broker_url) celery.config_from_object('celeryconfig') # Your celery configurations in a celeryconfig.py @celery.task(bind=True) def some_long_task(self, x, y): # Do some long task ... @app.route('/render/<id>', methods=['POST']) def render_script(id=None): ... data = json.loads(request.data) text_list = data.get('text_list') final_file = audio_class.render_audio(data=text_list) some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables return Response( mimetype='application/json', status=200 )
Uruchom aplikację Flask i rozpocznij kolejny proces, aby uruchomić pracownika selera.
Chciałbym również odnieść się do Miguel Gringberg za napisać dla bardziej w przewodniku głębokości do korzystania Seler z kolby.
źródło
Gwintowanie to kolejne możliwe rozwiązanie. Chociaż rozwiązanie oparte na programie Celery jest lepsze dla aplikacji na dużą skalę, jeśli nie spodziewasz się zbyt dużego ruchu w danym punkcie końcowym, wątkowanie jest realną alternatywą.
To rozwiązanie jest oparte na prezentacji Miguela Grinberga z PyCon 2016 Flask at Scale , a konkretnie na slajdzie 41 w jego slajdzie. Jego kod jest również dostępny na github dla osób zainteresowanych oryginalnym źródłem.
Z punktu widzenia użytkownika kod działa w następujący sposób:
Aby przekształcić wywołanie interfejsu API w zadanie w tle, po prostu dodaj dekorator @async_api.
Oto pełny przykład:
from flask import Flask, g, abort, current_app, request, url_for from werkzeug.exceptions import HTTPException, InternalServerError from flask_restful import Resource, Api from datetime import datetime from functools import wraps import threading import time import uuid tasks = {} app = Flask(__name__) api = Api(app) @app.before_first_request def before_first_request(): """Start a background thread that cleans up old tasks.""" def clean_old_tasks(): """ This function cleans up old tasks from our in-memory data structure. """ global tasks while True: # Only keep tasks that are running or that finished less than 5 # minutes ago. five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60 tasks = {task_id: task for task_id, task in tasks.items() if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago} time.sleep(60) if not current_app.config['TESTING']: thread = threading.Thread(target=clean_old_tasks) thread.start() def async_api(wrapped_function): @wraps(wrapped_function) def new_function(*args, **kwargs): def task_call(flask_app, environ): # Create a request context similar to that of the original request # so that the task can have access to flask.g, flask.request, etc. with flask_app.request_context(environ): try: tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs) except HTTPException as e: tasks[task_id]['return_value'] = current_app.handle_http_exception(e) except Exception as e: # The function raised an exception, so we set a 500 error tasks[task_id]['return_value'] = InternalServerError() if current_app.debug: # We want to find out if something happened so reraise raise finally: # We record the time of the response, to help in garbage # collecting old tasks tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow()) # close the database session (if any) # Assign an id to the asynchronous task task_id = uuid.uuid4().hex # Record the task, and then launch it tasks[task_id] = {'task_thread': threading.Thread( target=task_call, args=(current_app._get_current_object(), request.environ))} tasks[task_id]['task_thread'].start() # Return a 202 response, with a link that the client can use to # obtain task status print(url_for('gettaskstatus', task_id=task_id)) return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)} return new_function class GetTaskStatus(Resource): def get(self, task_id): """ Return status about an asynchronous task. If this request returns a 202 status code, it means that task hasn't finished yet. Else, the response from the task is returned. """ task = tasks.get(task_id) if task is None: abort(404) if 'return_value' not in task: return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)} return task['return_value'] class CatchAll(Resource): @async_api def get(self, path=''): # perform some intensive processing print("starting processing task, path: '%s'" % path) time.sleep(10) print("completed processing task, path: '%s'" % path) return f'The answer is: {path}' api.add_resource(CatchAll, '/<path:path>', '/') api.add_resource(GetTaskStatus, '/status/<task_id>') if __name__ == '__main__': app.run(debug=True)
źródło
Możesz także spróbować użyć
multiprocessing.Process
zdaemon=True
;process.start()
metoda nie blokuje i można natychmiast powrócić odpowiedź / statusu rozmówcy podczas gdy twój drogich sporządzi funkcyjnych w tle.Podobny problem napotkałem podczas pracy z frameworkiem Falcon i
daemon
pomogło użycie procesu.Musisz wykonać następujące czynności:
from multiprocessing import Process @app.route('/render/<id>', methods=['POST']) def render_script(id=None): ... heavy_process = Process( # Create a daemonic process with heavy "my_func" target=my_func, daemon=True ) heavy_process.start() return Response( mimetype='application/json', status=200 ) # Define some heavy function def my_func(): time.sleep(10) print("Process finished")
Powinieneś natychmiast otrzymać odpowiedź, a po 10 sekundach powinieneś zobaczyć wydrukowany komunikat w konsoli.
UWAGA: Pamiętaj, że
daemonic
procesy nie mogą tworzyć żadnych procesów podrzędnych.źródło
/render/<id>
punkt końcowy oczekuje czegoś w wynikumy_func()
?my_func
na przykład wysłać odpowiedź / puls do innego punktu końcowego. Lub możesz założyć i udostępnić jakąś kolejkę wiadomości, przez którą możesz się komunikowaćmy_func