„Odpal i zapomnij” Python async / await

115

Czasami trzeba wykonać jakąś niekrytyczną operację asynchroniczną, ale nie chcę czekać na jej zakończenie. W standardowej implementacji Tornado można „odpalić i zapomnieć” funkcję asynchroniczną, po prostu pomijając yieldsłowo kluczowe.

Próbowałem wymyślić, jak „odpalić i zapomnieć” z nową składnią async/ awaitwydaną w Pythonie 3.5. Np. Uproszczony fragment kodu:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

Jednak dzieje się tak, że bar()nigdy się nie uruchamia, a zamiast tego otrzymujemy ostrzeżenie w czasie wykonywania:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"
Mike N
źródło
Związane z? stackoverflow.com/q/32808893/1639625 W rzeczywistości myślę, że jest to duplikat, ale nie chcę go natychmiastowo dupe. Czy ktoś może potwierdzić?
tobias_k
3
@tobias_k, nie sądzę, że to duplikat. Odpowiedź pod linkiem jest zbyt szeroka, aby udzielić odpowiedzi na to pytanie.
Michaił Gierasimow
2
Czy (1) Twój „główny” proces trwa wiecznie? A może (2) czy chcesz, aby proces umarł, ale zapomniane zadania będą kontynuowane? A może (3) wolisz, aby główny proces czekał na zapomniane zadania tuż przed zakończeniem?
Julien Palard

Odpowiedzi:

170

Aktualizacja:

Wymień asyncio.ensure_futuresię asyncio.create_taskwszędzie, jeśli używasz Python> = 3.7 jest nowszy, ładniejszy sposób do odradzania zadania .


asyncio.Zrób „odpal i zapomnij”

Zgodnie z dokumentacją Pythona asyncio.Task, możliwe jest uruchomienie programu do wykonywania "w tle" . Zadanie utworzone przez asyncio.ensure_future funkcję nie zablokuje wykonania (dlatego funkcja wróci natychmiast!). Wygląda to na sposób na „odpalenie i zapomnienie”, jak prosiłeś.

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Wynik:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

Co się stanie, jeśli zadania są wykonywane po zakończeniu pętli zdarzeń?

Zauważ, że asyncio oczekuje, że zadanie zostanie zakończone w momencie zakończenia pętli zdarzeń. Więc jeśli zmienisz main()na:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

Po zakończeniu programu otrzymasz to ostrzeżenie:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

Aby temu zapobiec, możesz po prostu poczekać na wszystkie oczekujące zadania po zakończeniu pętli zdarzeń:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

Zabijaj zadania zamiast czekać na nie

Czasami nie chcesz czekać na wykonanie zadań (na przykład niektóre zadania mogą być tworzone tak, aby działały wiecznie). W takim przypadku możesz je po prostu anulować () zamiast czekać:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

Wynik:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo
Michaił Gierasimow
źródło
Skopiowałem i przeszedłem przez pierwszy blok i po prostu uruchomiłem go na swoim końcu iz jakiegoś powodu otrzymałem: linia 4 async def async_foo (): ^ Jakby był jakiś błąd składniowy z definicją funkcji w linii 4: "async def async_foo ( ): „Czy coś mi brakuje?
Gil Allen
3
@GilAllen ta składnia działa tylko w Pythonie 3.5+. Python 3.4 wymaga starej składni (patrz docs.python.org/3.4/library/asyncio-task.html ). Python 3.3 i starsze wersje w ogóle nie obsługują asyncio.
Michaił Gierasimow
Jak zabiłbyś zadania w wątku?… „Mam wątek, który tworzy pewne zadania i chcę zabić wszystkie oczekujące, gdy wątek umiera w swojej stop()metodzie.
Sardathrion - przeciwko nadużyciom SE
@Sardathrion Nie jestem pewien, czy zadanie wskazuje gdzieś w wątku, w którym zostało utworzone, ale nic nie stoi na przeszkodzie, aby śledzić je ręcznie: na przykład po prostu dodaj wszystkie zadania utworzone w wątku do listy, a gdy nadejdzie czas, anuluj je w sposób wyjaśniony powyżej.
Michaił Gierasimow
2
Zauważ, że „Task.all_tasks () jest przestarzałe od Pythona 3.7, zamiast tego użyj asyncio.all_tasks ()”
Alexis
12

Dziękuję Siergiej za zwięzłą odpowiedź. Oto zdobiona wersja tego samego.

import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

Produkuje

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

Uwaga: sprawdź moją drugą odpowiedź, która robi to samo, używając zwykłych wątków.

nehem
źródło
Doświadczyłem znacznego spowolnienia po zastosowaniu tego podejścia, tworząc ~ 5 małych zadań typu „uruchom i zapomnij” na sekundę. Nie używaj tego w środowisku produkcyjnym do długotrwałego zadania. Zje twój procesor i pamięć!
pir
10

To nie jest całkowicie asynchroniczne wykonanie, ale może run_in_executor () jest dla Ciebie odpowiednie.

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)
Sergey Gornostaev
źródło
3
Ładna, zwięzła odpowiedź. Warto zauważyć, że executorwola domyślnie dzwoni concurrent.futures.ThreadPoolExecutor.submit(). Wspominam, ponieważ tworzenie wątków nie jest darmowe; fire-and-zapominając 1000 razy na sekundę prawdopodobnie spowoduje duże obciążenie dla gospodarki gwintu
Brad Solomon
Tak. Nie zważałem na Twoje ostrzeżenie i doświadczyłem znacznego spowolnienia po zastosowaniu tego podejścia, tworząc ~ 5 małych zadań typu „uruchom i zapomnij” na sekundę. Nie używaj tego w środowisku produkcyjnym do długotrwałego zadania. Zje twój procesor i pamięć!
pir
3

Z jakiegoś powodu, jeśli nie możesz użyć, asynciooto implementacja wykorzystująca zwykłe wątki. Sprawdź moje inne odpowiedzi i odpowiedź Siergieja.

import threading

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")
nehem
źródło
Gdybyśmy potrzebowali tylko tej funkcjonalności fire_and_forget i niczego więcej od asyncio, czy nadal byłoby lepiej użyć asyncio? Jakie są korzyści?
pir