Wykonywanie asynchronicznego zadania w Flask

104

Piszę aplikację w Flasku, która działa naprawdę dobrze, z wyjątkiem tego, że WSGIjest synchroniczna i blokująca. Mam w szczególności jedno zadanie, które wywołuje interfejs API innej firmy i to zadanie może zająć kilka minut. Chciałbym wykonać to połączenie (w rzeczywistości jest to seria połączeń) i pozwolić mu działać. podczas gdy kontrola jest zwracana do Flaska.

Mój widok wygląda następująco:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Teraz chcę mieć linię

final_file = audio_class.render_audio()

uruchomić i dostarczyć wywołanie zwrotne do wykonania, gdy metoda zwróci, podczas gdy Flask może kontynuować przetwarzanie żądań. Jest to jedyne zadanie, które Flask musi wykonywać asynchronicznie i chciałbym uzyskać porady, jak najlepiej to zaimplementować.

Patrzyłem na Twisted i Klein, ale nie jestem pewien, czy są przesadą, ponieważ może wystarczyłby Threading. A może seler jest do tego dobrym wyborem?

Darwin Tech
źródło
Zwykle używam seler za to ... to może być przesada, ale AFAIK praca gwintowania robi dobrze w środowiskach internetowych (IIRC ...)
Joran Beasley
Dobrze. Tak - właśnie badałem Selera. To może być dobre podejście. Łatwy do wdrożenia z Flask?
Darwin Tech
heh, zwykle używam również serwera gniazd (flask-socketio) i tak, myślałem, że to całkiem proste ... najtrudniejszą częścią było zainstalowanie wszystkiego
Joran Beasley
4
Poleciłbym to sprawdzić. Ten facet pisze ogólnie świetne samouczki dla flask, a ten jest świetny do zrozumienia, jak zintegrować zadania asynchroniczne z aplikacją flask.
atlspin

Odpowiedzi:

106

Użyłbym Seler obsłużyć asynchroniczne zadanie dla Ciebie. Musisz zainstalować brokera, który będzie służył jako kolejka zadań (zalecane są RabbitMQ i Redis).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Uruchom aplikację Flask i rozpocznij kolejny proces, aby uruchomić pracownika selera.

$ celery worker -A app.celery --loglevel=debug

Chciałbym również odnieść się do Miguel Gringberg za napisać dla bardziej w przewodniku głębokości do korzystania Seler z kolby.

Connie
źródło
Seler to solidne rozwiązanie, ale nie jest to lekkie rozwiązanie, a jego ustawienie zajmuje trochę czasu.
wobbily_col
34

Gwintowanie to kolejne możliwe rozwiązanie. Chociaż rozwiązanie oparte na programie Celery jest lepsze dla aplikacji na dużą skalę, jeśli nie spodziewasz się zbyt dużego ruchu w danym punkcie końcowym, wątkowanie jest realną alternatywą.

To rozwiązanie jest oparte na prezentacji Miguela Grinberga z PyCon 2016 Flask at Scale , a konkretnie na slajdzie 41 w jego slajdzie. Jego kod jest również dostępny na github dla osób zainteresowanych oryginalnym źródłem.

Z punktu widzenia użytkownika kod działa w następujący sposób:

  1. Wykonujesz wywołanie punktu końcowego, który wykonuje długotrwałe zadanie.
  2. Ten punkt końcowy zwraca 202 Accepted z łączem do sprawdzenia stanu zadania.
  3. Wywołania linku stanu zwracają 202, gdy zadanie jest nadal uruchomione, i zwracają 200 (i wynik) po zakończeniu zadania.

Aby przekształcić wywołanie interfejsu API w zadanie w tle, po prostu dodaj dekorator @async_api.

Oto pełny przykład:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

Jurgen Strydom
źródło
Kiedy używam tego kodu, pojawia się błąd werkzeug.routing.BuildError: Nie można zbudować adresu URL punktu końcowego „gettaskstatus” z wartościami [„task_id”]. Czy czegoś mi brakuje?
Nicolas Dufaur
15

Możesz także spróbować użyć multiprocessing.Processz daemon=True; process.start()metoda nie blokuje i można natychmiast powrócić odpowiedź / statusu rozmówcy podczas gdy twój drogich sporządzi funkcyjnych w tle.

Podobny problem napotkałem podczas pracy z frameworkiem Falcon i daemonpomogło użycie procesu.

Musisz wykonać następujące czynności:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

Powinieneś natychmiast otrzymać odpowiedź, a po 10 sekundach powinieneś zobaczyć wydrukowany komunikat w konsoli.

UWAGA: Pamiętaj, że daemonicprocesy nie mogą tworzyć żadnych procesów podrzędnych.

Tomasz Bartkowiak
źródło
asynchroniczny to pewien typ współbieżności, który nie jest ani wielowątkowy, ani wieloprocesowy. Wątkowanie jest jednak znacznie bliższe w celu niż zadanie asynchroniczne,
tortal
5
Nie rozumiem twojego punktu. Autor mówi o zadaniu asynchronicznym, czyli takim, które jest uruchamiane „w tle”, tak że wywołujący nie blokuje się, dopóki nie otrzyma odpowiedzi. Tarło procesu demona jest przykładem tego, gdzie można osiągnąć taki asynchronizm.
Tomasz Bartkowiak
co, jeśli /render/<id>punkt końcowy oczekuje czegoś w wyniku my_func()?
Will Gu
Możesz my_funcna przykład wysłać odpowiedź / puls do innego punktu końcowego. Lub możesz założyć i udostępnić jakąś kolejkę wiadomości, przez którą możesz się komunikowaćmy_func
Tomasz Bartkowiak