Limit czasu w wywołaniu funkcji

300

Wywołuję funkcję w Pythonie, która, jak wiem, może się zawiesić i zmusić mnie do ponownego uruchomienia skryptu.

Jak wywołać funkcję lub w co ją owinąć, aby skoro trwa dłużej niż 5 sekund, skrypt ją anuluje i zrobi coś innego?

Teifion
źródło

Odpowiedzi:

227

Możesz użyć pakietu sygnału , jeśli pracujesz w systemie UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 sekund po wywołaniu alarm.alarm(10)zostaje wywołany przewodnik. Rodzi to wyjątek, który można przechwycić ze zwykłego kodu Pythona.

Ten moduł nie działa dobrze z wątkami (ale w takim razie, kto?)

Pamiętaj, że ponieważ zgłaszamy wyjątek, gdy nastąpi przekroczenie limitu czasu, może on zostać złapany i zignorowany wewnątrz funkcji, na przykład jednej z takich funkcji:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
piro
źródło
5
Używam Python 2.5.4. Wystąpił taki błąd: Traceback (ostatnie ostatnie połączenie): Plik „aa.py”, wiersz 85, w func signal.signal (signal.SIGALRM, moduł obsługi) AttributeError: Obiekt „module” nie ma atrybutu „
SIGALRM
11
@flypen, ponieważ signal.alarmi powiązane SIGALRMnie są dostępne na platformach Windows.
Podwójny AA
2
Jeśli jest wiele procesów i każde połączenie signal.signal--- czy wszystkie działają poprawnie? Czy każde signal.signalpołączenie nie anuluje „jednoczesnego” jednego?
Brownian
1
Ostrzeżenie dla tych, którzy chcą używać tego rozszerzenia C: Program obsługi sygnałów Python nie będzie wywoływany, dopóki funkcja C nie zwróci kontroli nad interpreterem Python. W tym przypadku użyj odpowiedzi ATOzTOA: stackoverflow.com/a/14924210/1286628
wkschwartz
13
Drugie ostrzeżenie o wątkach. signal.alarm działa tylko w głównym wątku. Próbowałem użyć tego w widokach Django - natychmiastowy błąd z verbage o głównym wątku.
JL Peyret
154

Możesz użyć multiprocessing.Processdo zrobienia dokładnie tego.

Kod

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
ATOzTOA
źródło
36
Jak mogę uzyskać wartość zwracaną metody docelowej?
bad_keypoints
4
Nie działa to, jeśli wywoływana funkcja utknie w bloku we / wy.
sudo
4
@bad_keypoints Zobacz tę odpowiedź: stackoverflow.com/a/10415215/1384471 Zasadniczo przekazujesz listę, w której umieszczasz odpowiedź.
Peter
1
@sudo następnie usuń join(). to sprawia, że ​​twoja liczba współbieżnych podprocesów jest uruchomiona, dopóki nie zakończą pracy, lub ilość zdefiniowana w join(10). Jeśli masz blokujące operacje we / wy dla 10 procesów, używając join (10) ustawiłeś je tak, aby czekały na maksimum 10 dla KAŻDEGO rozpoczętego procesu. Użyj flagi demona, jak w tym przykładzie stackoverflow.com/a/27420072/2480481 . Oczywiście możesz przekazać flagę daemon=Truebezpośrednio do multiprocessing.Process()funkcji.
m3nda
2
@ATOzTOA problemem z tym rozwiązaniem, przynajmniej dla moich celów, jest to, że potencjalnie nie pozwala ono dzieciom na czyszczenie po sobie. Z dokumentacji funkcji zakończeniaterminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
abalcerek
78

Jak wywołać funkcję lub w co ją owinąć, aby skoro anulowanie trwa dłużej niż 5 sekund, skrypt ją anuluje?

Zamieściłem istotę , która rozwiązuje to pytanie / problem z dekorator a threading.Timer. Oto podział.

Importuje i konfiguruje dla kompatybilności

Został przetestowany w Pythonie 2 i 3. Powinien także działać w systemach Unix / Linux i Windows.

Najpierw import. Starają się zachować spójność kodu niezależnie od wersji Pythona:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Użyj kodu niezależnego od wersji:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Teraz zaimportowaliśmy naszą funkcjonalność ze standardowej biblioteki.

exit_after dekorator

Następnie potrzebujemy funkcji, aby zakończyć main()wątek potomny:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

A oto sam dekorator:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Stosowanie

A oto użycie, które bezpośrednio odpowiada na pytanie o wyjściu po 5 sekundach:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Próbny:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Drugie wywołanie funkcji nie zakończy się, zamiast tego proces powinien zostać zakończony za pomocą śledzenia zwrotnego!

KeyboardInterrupt nie zawsze zatrzymuje śpiący wątek

Pamiętaj, że uśpienie nie zawsze będzie przerywane przerwaniem klawiatury, w Pythonie 2 w systemie Windows, np .:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

nie jest też w stanie przerwać działania kodu w rozszerzeniach, chyba że wyraźnie to sprawdzi PyErr_CheckSignals(), zobacz Cython, Python i KeyboardInterrupt ignorowane

W każdym razie unikałbym spania wątku dłużej niż sekundę - to eon czasu procesora.

Jak wywołać funkcję lub w co ją owinąć, aby skoro trwa dłużej niż 5 sekund, skrypt ją anuluje i zrobi coś innego?

Aby go złapać i zrobić coś innego, możesz złapać KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Aaron Hall
źródło
Nie przeczytałem jeszcze całego twojego postu, ale po prostu zastanawiałem się: co jeśli kolor jest 0? To byłoby interpretowane jako Fałsz w poniższym wyrażeniu if, prawda?
Koenraad van Duin
2
Dlaczego muszę dzwonić thread.interrupt_main(), dlaczego nie mogę bezpośrednio zgłosić wyjątku?
Anirban Nag 'tintinmj'
Masz jakieś przemyślenia na temat multiprocessing.connection.Clienttego? - Próbuję rozwiązać: stackoverflow.com/questions/57817955/…
wwii
51

Mam inną propozycję, która jest czystą funkcją (z tym samym interfejsem API co sugestia wątków) i wydaje się działać dobrze (w oparciu o sugestie dotyczące tego wątku)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
źródło
3
Powinieneś także przywrócić oryginalny moduł obsługi sygnału. Zobacz stackoverflow.com/questions/492519/...
Martin Konecny
9
Jeszcze jedna uwaga: metoda sygnału uniksowego działa tylko wtedy, gdy zastosujesz ją w głównym wątku. Zastosowanie go w wątku podrzędnym powoduje wyjątek i nie będzie działać.
Martin Konecny
12
To nie jest najlepsze rozwiązanie, ponieważ działa tylko na systemie Linux.
maks.
17
Max, nieprawda - działa na każdym Uniksie zgodnym z POSIX. Myślę, że twój komentarz powinien być dokładniejszy, nie działa w systemie Windows.
Chris Johnson
6
Powinieneś unikać ustawiania kwargsa na pusty dyktand. Często używaną metodą Pythona jest to, że domyślne argumenty funkcji są zmienne. Aby słownik był udostępniany we wszystkich połączeniach z timeout. O wiele lepiej jest ustawić wartość domyślną Nonei dodać w pierwszym wierszu funkcji kwargs = kwargs or {}. Args jest w porządku, ponieważ krotek nie można modyfikować.
scottmrogowski,
31

Natknąłem się na ten wątek, szukając limitu czasu w testach jednostkowych. Nie znalazłem nic prostego w odpowiedziach ani paczkach stron trzecich, więc napisałem poniżej dekorator, który możesz wrzucić do kodu:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

To jest tak proste, jak przekroczenie limitu czasu testu lub dowolnej funkcji, którą lubisz:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Bogaty
źródło
14
Uważaj, ponieważ nie kończy to działania po upływie limitu czasu!
Sylvain
Zauważ, że w systemie Windows powoduje to zupełnie nowy proces - który zajmie czas do przekroczenia limitu czasu, być może o wiele, jeśli konfiguracja zależności zajmie dużo czasu.
Aaron Hall
1
Tak, to wymaga drobnych poprawek. Pozostawia nici na zawsze.
sudo
2
IDK, jeśli jest to najlepszy sposób, ale możesz spróbować / catch Exceptionwewnątrz func_wrapper i zrobić pool.close()po złapaniu, aby upewnić się, że wątek zawsze umiera później, bez względu na wszystko. Następnie możesz rzucić TimeoutErrorlub cokolwiek chcesz. Wydaje się dla mnie pracować.
sudo
2
Jest to przydatne, ale kiedy już to robię wiele razy, dostaję RuntimeError: can't start new thread. Czy nadal będzie działać, jeśli go zignoruję, czy jest coś innego, co mogę zrobić, aby to obejść? Z góry dziękuję!
Benjie
19

stopitPakiet, znalezionych na PyPI, wydaje się dobrze obsłużyć limity czasu.

Podoba mi się @stopit.threading_timeoutabledekorator, który dodaje timeoutparametr do dekorowanej funkcji, która robi to, czego oczekujesz, zatrzymuje funkcję.

Sprawdź to na pypi: https://pypi.python.org/pypi/stopit

eglandia
źródło
1
Jest bardzo poręczny i bezpieczny dla nici! Dzięki i Plus jeden! To najlepsza opcja, jaką znalazłem do tej pory, a nawet lepsza niż zaakceptowana odpowiedź !!
Yahya
Biblioteka twierdzi, że niektóre funkcje nie działają w systemie Windows.
Stefan Simik
16

Istnieje wiele sugestii, ale żadna z nich nie używa współbieżnych. Przyszłości, co moim zdaniem jest najbardziej czytelnym sposobem na poradzenie sobie z tym.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super prosty do odczytania i utrzymania.

Tworzymy pulę, przesyłamy pojedynczy proces, a następnie czekamy do 5 sekund przed podniesieniem błędu limitu czasu, który można złapać i obsłużyć w razie potrzeby.

Natywny dla Pythona 3.2+ i backportowany do 2.7 (futures instalujacy pip).

Przełączanie pomiędzy gwintem i sposobów jest to tak proste jak wymiana ProcessPoolExecutorz ThreadPoolExecutor.

Jeśli chcesz zakończyć Proces po upływie limitu czasu, sugeruję zajrzenie do Pebble .

Brian
źródło
2
Co oznacza „Ostrzeżenie: nie kończy funkcji, jeśli upłynął limit czasu”?
Scott Stafford,
5
@ScottStafford Procesy / wątki nie kończą się tylko dlatego, że został zgłoszony błąd TimeoutError. Tak więc proces lub wątek nadal będzie próbował uruchomić się do końca i nie da ci automatycznie kontroli po przekroczeniu limitu czasu.
Brian
Czy pozwoliłoby mi to zapisać wyniki pośrednie w tym czasie? np. jeśli mam funkcję rekurencyjną, dla której ustawiłem limit czasu na 5, i w tym czasie mam wyniki częściowe, jak napisać funkcję zwracającą wyniki częściowe po przekroczeniu limitu czasu?
SumNeuron
Używam dokładnie tego, jednak mam 1000 zadań, każde jest dozwolone 5 sekund przed upływem limitu czasu. Mój problem polega na tym, że rdzenie są zapychane w zadaniach, które nigdy się nie kończą, ponieważ limit czasu jest stosowany tylko w sumie zadań, a nie poszczególnych zadań. concurrent.futures nie zapewnia rozwiązania tego problemu.
Bastiaan
11

Świetny, łatwy w użyciu i niezawodny dekorator przekroczenia limitu czasu projektu PyPi ( https://pypi.org/project/timeout-decorator/ )

instalacja :

pip install timeout-decorator

Zastosowanie :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Gil
źródło
2
Doceniam jasne rozwiązanie. Ale czy ktokolwiek mógłby wyjaśnić, jak działa ta biblioteka, szczególnie w przypadku wielowątkowości. Osobiście boję się używać nieznanego machanizmu do obsługi wątków lub sygnałów.
wsysuper
@wsysuper lib ma 2 tryby działania: otwórz nowy wątek lub nowy podproces (który prawdopodobnie jest bezpieczny dla wątków)
Gil
6

Jestem autorem wrapt_timeout_decorator

Większość przedstawionych tutaj rozwiązań działa na pierwszy rzut oka pod Linuksem - ponieważ mamy fork () i sygnały () - ale w systemie Windows wygląda to nieco inaczej. A jeśli chodzi o wątki w Linuksie, nie możesz już używać Sygnałów.

Aby odrodzić proces w systemie Windows, musi być możliwy do odebrania - a wiele udekorowanych funkcji lub metod klasowych nie.

Musisz więc użyć lepszego piklera, takiego jak koperek i proces wieloprocesowy (nie marynowany i wieloprocesowy) - dlatego nie możesz używać ProcessPoolExecutor (lub tylko z ograniczoną funkcjonalnością).

Dla samego limitu czasu - Musisz zdefiniować, co oznacza limit czasu - ponieważ w systemie Windows zajmie to sporo (i nie można go określić) czasu na odrodzenie procesu. Może to być trudne w przypadku krótkich limitów czasu. Załóżmy, że odrodzenie procesu zajmuje około 0,5 sekundy (łatwo !!!). Jeśli limit czasu wynosi 0,2 sekundy, co powinno się stać? Czy funkcja powinna upłynąć po 0,5 + 0,2 sekundy (więc niech metoda będzie działać przez 0,2 sekundy)? A może wywoływany proces wygaśnie po 0,2 sekundy (w takim przypadku funkcja dekoracyjna ZAWSZE przekroczy limit czasu, ponieważ w tym czasie nie jest nawet odradzana)?

Również zagnieżdżone dekoratory mogą być nieprzyjemne i nie można używać sygnałów w podtytule. Jeśli chcesz stworzyć prawdziwie uniwersalny, wieloplatformowy dekorator, wszystko to należy wziąć pod uwagę (i przetestować).

Inne problemy to przekazywanie wyjątków z powrotem do programu wywołującego, a także problemy z logowaniem (jeśli są używane w funkcji dekorowanej - logowanie do plików w innym procesie NIE jest obsługiwane)

Próbowałem objąć wszystkie przypadki brzegowe. Możesz zajrzeć do pakietu wrapt_timeout_decorator, lub przynajmniej przetestować własne rozwiązania inspirowane najbardziej używanymi tam jednostkami.

@Alexis Eggermont - niestety nie mam wystarczającej liczby punktów do skomentowania - może ktoś inny może Cię powiadomić - myślę, że rozwiązałem problem z importem.

bitranox
źródło
3

timeout-decoratornie działają w systemie Windows, ponieważ Windows nie obsługiwał signaldobrze.

Jeśli użyjesz timeout-decorator w systemie Windows, otrzymasz następujące

AttributeError: module 'signal' has no attribute 'SIGALRM'

Niektórzy sugerowali użycie, use_signals=Falseale nie działali dla mnie.

Autor @bitranox utworzył następujący pakiet:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Przykładowy kod:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Daje następujący wyjątek:

TimeoutError: Function mytest timed out after 5 seconds
jak gdyby
źródło
Brzmi jak bardzo miłe rozwiązanie. O dziwo, linia from wrapt_timeout_decorator import * wydaje się zabijać część moich innych importów. Na przykład dostaję ModuleNotFoundError: No module named 'google.appengine', ale nie dostaję tego błędu, jeśli nie zaimportuję wrapt_timeout_decorator
Alexis Eggermont
@AlexisEggermont Miałem zamiar używać tego z aplikacją ... więc jestem bardzo ciekawy, czy ten błąd nadal występuje?
PascalVKooten
2

Możemy użyć sygnałów do tego samego. Myślę, że poniższy przykład będzie dla ciebie przydatny. Jest bardzo prosty w porównaniu do wątków.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
AR
źródło
1
Lepiej byłoby wybrać konkretny wyjątek i złapać tylko ten wyjątek. Gołe try: ... except: ...są zawsze złym pomysłem.
hivert
Zgadzam się z tobą Hivert.
AR
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Hal Canary
źródło
7
Chociaż ten kod może odpowiedzieć na pytanie, dostarczenie dodatkowego kontekstu dotyczącego tego, jak i / lub dlaczego rozwiązuje problem, poprawiłoby długoterminową wartość odpowiedzi
Dan Cornilescu
1

Miałem potrzebę gniazdowa przerwania czasowe (który SIGALARM nie można zrobić), które nie będą blokowane przez time.sleep (których podejście oparte wątek nie może zrobić). Skończyłem kopiowanie i lekką modyfikację kodu stąd: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Sam kod:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

i przykład użycia:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
James
źródło
To również używa sygnału, dlatego nie będzie działać, jeśli zostanie wywołane z wątku.
garg10may
0

Oto niewielka poprawa podanego rozwiązania opartego na wątkach.

Poniższy kod obsługuje wyjątki :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Wywoływanie go z 5-sekundowym limitem czasu:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
diemacht
źródło
1
Spowoduje to pojawienie się nowego wyjątku ukrywającego oryginalny ślad. Zobacz moją wersję poniżej ...
Meitham,
1
Jest to również niebezpieczne, ponieważ w runFunctionCatchExceptions()niektórych funkcjach Pythona wywoływane są GIL. Np dodaje będzie nigdy, albo za bardzo długi czas powrotu nazywany wewnątrz funkcji: eval(2**9999999999**9999999999). Zobacz stackoverflow.com/questions/22138190/…
Mikko Ohtamaa