Asynchroniczne wywołanie metody w Pythonie?

178

Zastanawiałem się, czy w Pythonie jest jakaś biblioteka do wywołań metod asynchronicznych . Byłoby wspaniale, gdybyś mógł zrobić coś takiego

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

Lub asynchroniczne wywołanie procedury innej niż asynchroniczna

def longComputation()
    <code>

token = asynccall(longComputation())

Byłoby wspaniale mieć bardziej wyrafinowaną strategię jako natywną w rdzeniu języka. Czy to było brane pod uwagę?

Stefano Borini
źródło
Jak Python 3.4: docs.python.org/3/library/asyncio.html (tam backportu dla 3.3 i nowy, błyszczący asynci awaitskładnią od 3.5).
jonrsharpe
Nie ma mechanizmu wywołania zwrotnego, ale można agregować wyniki w słowniku i jest on oparty na module wieloprocesorowym Pythona. Jestem pewien, że możesz dodać jeszcze jeden parametr do dekorowanej funkcji jako wywołanie zwrotne. github.com/alex-sherman/deco .
RajaRaviVarma
Rozpocząć. Oficjalna dokumentacja - docs.python.org/3/library/concurrency.html
Adarsh ​​Madrecha,

Odpowiedzi:

141

Możesz użyć modułu wieloprocesorowego dodanego w Pythonie 2.6. Możesz użyć pul procesów, a następnie uzyskać wyniki asynchronicznie za pomocą:

apply_async(func[, args[, kwds[, callback]]])

Na przykład:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

To tylko jedna alternatywa. Ten moduł zapewnia wiele udogodnień, aby osiągnąć to, czego chcesz. Poza tym naprawdę łatwo będzie zrobić z tego dekoratora.

Lucas S.
źródło
5
Lucas S., twój przykład niestety nie działa. Funkcja zwrotna nigdy nie jest wywoływana.
DataGreed
6
Prawdopodobnie warto pamiętać, że powoduje to oddzielne procesy, a nie oddzielny wątek w procesie. Może to mieć pewne konsekwencje.
user47741
11
To działa: result = pool.apply_async (f, [10], callback = finish)
MJ
6
Aby naprawdę robić cokolwiek asynchronicznie w Pythonie, konieczne jest użycie modułu wieloprocesorowego do tworzenia nowych procesów. Samo tworzenie nowych wątków jest nadal na łasce Globalnej Blokady Interpretera, która zapobiega wykonywaniu wielu rzeczy jednocześnie przez proces Pythona.
Drahkar
2
Jeśli nie chcesz odradzać nowego procesu podczas korzystania z tego rozwiązania - zmień import na from multiprocessing.dummy import Pool. multiprocessing.dummy ma dokładnie to samo zachowanie zaimplementowane w wątkach zamiast w procesach
Almog Cohen
203

Coś jak:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

Więcej informacji można znaleźć w dokumentacji pod adresem https://docs.python.org/library/threading.html .

Drakosha
źródło
1
tak, jeśli po prostu musisz robić rzeczy asynchronicznie, dlaczego nie używać po prostu wątku? w końcu wątek jest lekki niż proces
kk1957
22
Ważna uwaga: standardowa implementacja (CPython) wątków nie pomoże w zadaniach związanych z obliczeniami, ze względu na „Global Interpreter Lock”. Zobacz dokumentację biblioteczną: link
solublefish
3
Czy użycie funkcji thread.join () jest naprawdę asynchroniczne? A co, jeśli nie chcesz blokować wątku (np. Wątku interfejsu użytkownika) i nie używać mnóstwa zasobów wykonujących na nim pętlę while?
Mgamerz
1
Łączenie @Mgamerz jest synchroniczne. Możesz pozwolić wątkowi umieścić wyniki wykonania w jakiejś kolejce lub / i wywołać wywołanie zwrotne. W przeciwnym razie nie wiesz, kiedy to się skończy (jeśli w ogóle).
Drakosha
1
Czy jest możliwe aby wywołać funkcję oddzwonienia na końcu wykonanie gwintu jak można zrobić z multiprocessing.Pool
Reda Drissi
49

Od wersji Python 3.5 można używać ulepszonych generatorów dla funkcji asynchronicznych.

import asyncio
import datetime

Ulepszona składnia generatora:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

Nowa async/awaitskładnia:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
camabeh
źródło
8
@carnabeh, czy mógłbyś rozszerzyć ten przykład o funkcję „def longComputation ()” OP? Większość przykładów używa "await asyncio.sleep (1)", ale jeśli longComputation () zwraca, powiedzmy, double, nie możesz po prostu użyć "await longComputation ()".
Fab
Dziesięć lat w przyszłości i teraz powinna to być akceptowana odpowiedź. Kiedy mówisz o async w pythonie 3.5+, przychodzi na myśl asyncio i słowo kluczowe async.
zeh
31

Nie ma jej w rdzeniu językowym, ale bardzo dojrzałą biblioteką, która robi to, co chcesz, jest Twisted . Wprowadza obiekt Deferred, do którego można dołączyć wywołania zwrotne lub procedury obsługi błędów („errbacks”). Odroczona jest w zasadzie „obietnicą”, że funkcja ostatecznie przyniesie skutek.

Meredith L. Patterson
źródło
1
W szczególności spójrz na twisted.internet.defer ( twistedmatrix.com/documents/8.2.0/api/ ... ).
Nicholas Riley
21

Możesz zaimplementować dekorator, aby uczynić swoje funkcje asynchronicznymi, chociaż jest to trochę trudne. multiprocessingModuł jest pełna małych dziwactw i pozornie arbitralnych ograniczeń - tym bardziej powód, aby ująć go za przyjazny interfejs, choć.

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

Poniższy kod ilustruje użycie dekoratora:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

W prawdziwym świecie rozwinąłbym nieco więcej na temat dekoratora, zapewniając sposób na wyłączenie go w celu debugowania (przy jednoczesnym utrzymaniu przyszłego interfejsu na miejscu) lub może ułatwienie radzenia sobie z wyjątkami; ale myślę, że to wystarczająco dobrze pokazuje zasadę.

xperroni
źródło
To powinna być najlepsza odpowiedź. Uwielbiam to, jak może zwracać wartość. Nie tak jak wątek, który po prostu działa asynchronicznie.
Aminah Nuraini
16

Właśnie

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
Antigluk
źródło
8

Możesz użyć eventlet. Pozwala napisać coś, co wydaje się być kodem synchronicznym, ale działa asynchronicznie w sieci.

Oto przykład super minimalnego robota:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
Raj
źródło
7

Moje rozwiązanie to:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

I działa dokładnie zgodnie z życzeniem:

@Async
def fnc():
    pass
Nevyn
źródło
5

Coś takiego działa dla mnie, możesz wtedy wywołać funkcję, a ona wyśle ​​się do nowego wątku.

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
Nicholas Hamilton
źródło
2

Czy jest jakiś powód, aby nie używać wątków? Możesz skorzystać z threadingklasy. Zamiast finished()funkcji użyj isAlive(). result()Funkcja mogłaby join()wątku i pobrać wynik. Jeśli możesz, zastąp funkcje run()i, __init__aby wywołać funkcję określoną w konstruktorze i zapisać wartość gdzieś w instancji klasy.

ondra
źródło
2
Jeśli jest to funkcja kosztowna obliczeniowo, wątkowanie nic nie da (prawdopodobnie spowoduje spowolnienie), ponieważ proces Pythona jest ograniczony do jednego rdzenia procesora ze względu na GIL.
Kurt
2
@Kurt, chociaż to prawda, OP nie wspomniał, że wydajność była jego zmartwieniem. Istnieją inne powody chęci zachowania asynchronicznego ...
Peter Hansen,
Wątki w Pythonie nie są świetne, gdy chcesz mieć możliwość zabicia wywołania metody asynchronicznej, ponieważ tylko główny wątek w Pythonie odbiera sygnały.
CivFan
2

Możesz użyć concurrent.futures (dodane w Pythonie 3.2).

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')
Duża dynia
źródło
To bardzo dobra odpowiedź, ponieważ jako jedyna daje możliwość puli wątków z callbackami
Reda Drissi
Niestety cierpi to również na „Globalną blokadę interpretera”. Zobacz dokument biblioteki: link . Testowane w Pythonie 3.7
Alex
0

Możesz użyć procesu. Jeśli chcesz go uruchamiać na zawsze, użyj funkcji while (jak w sieci):

from multiprocessing import Process
def foo():
    while 1:
        # Do something

p = Process(target = foo)
p.start()

jeśli chcesz go uruchomić tylko jeden raz, zrób tak:

from multiprocessing import Process
def foo():
    # Do something

p = Process(target = foo)
p.start()
p.join()
Keivan
źródło