Różnica między coroutine a future / task w Pythonie 3.5?

102

Powiedzmy, że mamy funkcję fikcyjną:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

Jaka jest różnica pomiędzy:

import asyncio    

coros = []
for i in range(5):
    coros.append(foo(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coros))

I:

import asyncio

futures = []
for i in range(5):
    futures.append(asyncio.ensure_future(foo(i)))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))

Uwaga : przykład zwraca wynik, ale nie jest to główny temat pytania. Gdy wartość zwracana ma znaczenie, użyj gather()zamiast wait().

Niezależnie od wartości zwracanej, szukam jasności ensure_future(). wait(coros)i wait(futures)oba obsługują programy, więc kiedy i dlaczego powinien być zawinięty w coroutine ensure_future?

Zasadniczo, jaki jest właściwy sposób (tm), aby wykonać kilka operacji nieblokujących przy użyciu języka Python 3.5 async?

Aby uzyskać dodatkowe środki, co jeśli chcę grupować połączenia? Na przykład muszę dzwonić some_remote_call(...)1000 razy, ale nie chcę niszczyć serwera WWW / bazy danych / itp. Przy 1000 jednoczesnych połączeniach. Można to zrobić za pomocą wątku lub puli procesów, ale czy istnieje sposób, aby to zrobić asyncio?

Aktualizacja 2020 (Python 3.7+) : Nie używaj tych fragmentów. Zamiast tego użyj:

import asyncio

async def do_something_async():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(foo(i)))
    await asyncio.gather(*tasks)

def do_something():
    asyncio.run(do_something_async)

Rozważ także użycie Trio , solidnej alternatywy innej firmy dla asyncio.

dzianina
źródło

Odpowiedzi:

96

Program jest funkcją generatora, która może zarówno dawać wartości, jak i przyjmować wartości z zewnątrz. Zaletą używania coroutine jest to, że możemy wstrzymać wykonywanie funkcji i wznowić ją później. W przypadku pracy w sieci sensowne jest wstrzymanie wykonywania funkcji w oczekiwaniu na odpowiedź. Możemy wykorzystać ten czas na uruchomienie innych funkcji.

Przyszłość jest jak Promiseobiekty z Javascript. Jest jak symbol zastępczy wartości, która zmaterializuje się w przyszłości. W powyższym przypadku, podczas oczekiwania na sieciowe I / O, funkcja może dać nam kontener, obietnicę, że po zakończeniu operacji wypełni kontener wartością. Trzymamy się przyszłego obiektu, a gdy zostanie spełniony, możemy wywołać na nim metodę, aby uzyskać rzeczywisty wynik.

Odpowiedź bezpośrednia: nie potrzebujesz, ensure_futurejeśli nie potrzebujesz wyników. Są dobre, jeśli potrzebujesz wyników lub odzyskasz wyjątki.

Dodatkowe kredyty: wybrałbym run_in_executori zdał Executorinstancję, aby kontrolować maksymalną liczbę pracowników.

Objaśnienia i przykładowe kody

W pierwszym przykładzie używasz coroutines. waitFunkcja przyjmuje kilka współprogram i łączy je ze sobą. Więc wait()kończy się, gdy wszystkie procedury zostaną wyczerpane (zakończone / zakończone, zwracające wszystkie wartości).

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

run_until_completeMetoda by upewnić się, że pętla jest żywy aż wykonanie zostało zakończone. Zwróć uwagę, że w tym przypadku nie otrzymujesz wyników wykonywania asynchronicznego.

W drugim przykładzie używasz ensure_futurefunkcji do zawijania procedury i zwracania Taskobiektu, który jest czymś w rodzaju Future. Program jest zaplanowany do wykonania w głównej pętli zdarzeń po wywołaniu ensure_future. Zwrócony obiekt future / task nie ma jeszcze wartości, ale z biegiem czasu, po zakończeniu operacji sieciowych, przyszły obiekt będzie zawierał wynik operacji.

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

W tym przykładzie robimy to samo, z wyjątkiem tego, że używamy futures, a nie tylko korektorów.

Spójrzmy na przykład, jak używać asyncio / coroutines / futures:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

Tutaj zastosowaliśmy create_taskmetodę na loopobiekcie. ensure_futurezaplanowałby zadanie w głównej pętli zdarzeń. Ta metoda umożliwia zaplanowanie programu w wybranej przez nas pętli.

Widzimy również koncepcję dodania wywołania zwrotnego przy użyciu add_done_callbackmetody na obiekcie zadania.

A Taskma miejsce, donegdy program zwraca wartość, zgłasza wyjątek lub zostaje anulowany. Istnieją metody sprawdzania tych incydentów.

Napisałem kilka postów na blogu na te tematy, które mogą pomóc:

Oczywiście więcej szczegółów znajdziesz w oficjalnej instrukcji: https://docs.python.org/3/library/asyncio.html

masnun
źródło
3
Zaktualizowałem moje pytanie, aby było trochę bardziej jasne - jeśli nie potrzebuję wyniku z programu, czy nadal muszę go używać ensure_future()? A jeśli potrzebuję wyniku, czy nie mogę po prostu użyć run_until_complete(gather(coros))?
knite
1
ensure_futureplanuje wykonanie programu w pętli zdarzeń. Więc powiedziałbym, że tak, to jest wymagane. Ale oczywiście możesz zaplanować programy za pomocą innych funkcji / metod. Tak, możesz użyć gather()- ale zbieraj będzie czekać, aż wszystkie odpowiedzi zostaną zebrane.
masnun
5
@AbuAshrafMasnun @knite gatheri waitfaktycznie zawiń podane programy jako zadania przy użyciu ensure_future(zobacz źródła tutaj i tutaj ). Więc nie ma sensu używać go ensure_futurewcześniej i nie ma to nic wspólnego z uzyskaniem wyników lub nie.
Vincent
8
@AbuAshrafMasnun @knite Również ensure_futurema loopargumentu, więc nie ma powodu, aby korzystać z loop.create_taskponad ensure_future. I run_in_executornie będzie działać z coroutines, zamiast tego należy użyć semafora .
Vincent,
2
@vincent jest powód, aby użyć create_taskover ensure_future, zobacz dokumentację . Cytatcreate_task() (added in Python 3.7) is the preferable way for spawning new tasks.
masi
24

Prosta odpowiedź

  • Wywołanie funkcji coroutine ( async def) NIE powoduje jej uruchomienia. Zwraca dodatkowe obiekty, tak jak funkcja generatora zwraca obiekty generatora.
  • await pobiera wartości z programów, tj. „wywołuje” program
  • eusure_future/create_task zaplanuj, aby program uruchomił się w pętli zdarzeń przy następnej iteracji (chociaż nie czeka na zakończenie, jak wątek demona).

Kilka przykładów kodu

Najpierw wyjaśnijmy kilka terminów:

  • funkcja coroutine, ta, którą jesteś async def;
  • obiekt coroutine, co otrzymujesz, gdy „wywołujesz” funkcję coroutine;
  • task, obiekt zawinięty wokół obiektu coroutine i uruchamiany w pętli zdarzeń.

Przypadek 1, awaitw programie

Tworzymy dwa programy, awaitjeden i używamy go create_taskdo uruchamiania drugiego.

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

otrzymasz wynik:

1539486251.7055213 - await
1539486251.7055705 - create_task

Wyjaśnić:

zadanie1 zostało wykonane bezpośrednio, a zadanie2 zostało wykonane w kolejnej iteracji.

Przypadek 2, oddanie kontroli pętli zdarzeń

Jeśli zastąpimy główną funkcję, zobaczymy inny wynik:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

otrzymasz wynik:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

Wyjaśnić:

Podczas wywoływania asyncio.sleep(1)formant został przekazany z powrotem do pętli zdarzeń, a pętla sprawdza zadania do uruchomienia, a następnie uruchamia zadanie utworzone przez create_task.

Zwróć uwagę, że najpierw wywołujemy funkcję coroutine, ale nie awaitją, więc utworzyliśmy tylko jeden coroutine i nie uruchamialiśmy go. Następnie ponownie wywołujemy funkcję coroutine i zawijamy ją w create_taskwywołanie, creat_task faktycznie zaplanuje działanie programu w następnej iteracji. W rezultacie create taskjest wykonywany wcześniej await.

Właściwie chodzi o przywrócenie kontroli nad pętlą, której można by użyć, asyncio.sleep(0)aby zobaczyć ten sam wynik.

Pod maską

loop.create_taskfaktycznie dzwoni asyncio.tasks.Task(), który zadzwoni loop.call_soon. I loop.call_soonwłączy zadanie loop._ready. Podczas każdej iteracji pętli sprawdza wszystkie wywołania zwrotne w loop._ready i uruchamia je.

asyncio.wait, asyncio.ensure_futurea asyncio.gatherwłaściwie zadzwoń loop.create_taskbezpośrednio lub pośrednio.

Zwróć również uwagę w dokumentach :

Callbacki są wywoływane w kolejności, w jakiej zostały zarejestrowane. Każde wywołanie zwrotne zostanie wywołane dokładnie raz.

ospider
źródło
1
Dzięki za czyste wyjaśnienie! Muszę powiedzieć, że to dość okropny projekt. Interfejs API wysokiego poziomu przecieka abstrakcję niskiego poziomu, która nadmiernie komplikuje interfejs API.
Boris Burkov
1
sprawdź projekt curio, który jest dobrze zaprojektowany
ospider
Niezłe wyjaśnienie! Myślę, że efekt await task2wezwania mógłby zostać wyjaśniony. W obu przykładach wywołanie loop.create_task () jest tym, co planuje zadanie2 w pętli zdarzeń. Więc w obu ex można usunąć, await task2a task2 w końcu zostanie uruchomiony. W ex2 zachowanie będzie identyczne, ponieważ await task2uważam, że jest to po prostu planowanie już ukończonego zadania (które nie zostanie uruchomione po raz drugi), podczas gdy w ex1 zachowanie będzie nieco inne, ponieważ zadanie2 nie zostanie wykonane do zakończenia main. Aby zobaczyć różnicę, dodaj print("end of main")na końcu głównej części ex1
Andrzej
11

Komentarz Vincenta pod linkiem https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346 , który pokazuje, że wait()wszystko to jest ensure_future()dla Ciebie!

Innymi słowy, potrzebujemy przyszłości, a programy po cichu zostaną w nie przekształcone.

Zaktualizuję tę odpowiedź, gdy znajdę ostateczne wyjaśnienie, jak grupować coroutines / futures.

dzianina
źródło
Czy to znaczy, że dla współprogram obiektu c, await cjest odpowiednikiem await create_task(c)?
Alexey
3

Z BDFL [2013]

Zadania

  • To program zawinięty w przyszłość
  • class Zadanie jest podklasą klasy Future
  • Działa to też z await !

  • Czym różni się od nagiego programu?
  • Może robić postępy, nie czekając na to
    • Dopóki czekasz na coś innego, tj
      • czekaj [coś_else]

Mając to na uwadze, ensure_futurema sens jako nazwa do tworzenia Zadania, ponieważ wynik Przyszłości zostanie obliczony bez względu na to, czy czekasz na to (o ile czekasz na coś). Dzięki temu pętla zdarzeń może zakończyć Twoje zadanie, gdy czekasz na inne rzeczy. Zauważ, że w Pythonie 3.7 create_taskjest preferowanym sposobem zapewnienia przyszłości .

Uwaga: zmieniłem „ustępowanie” na slajdach Guido na „czekaj” tutaj na nowoczesność.

crizCraig
źródło