Klawiatura przerywa z wieloprocesorową pulą Pythona

136

Jak mogę obsłużyć zdarzenia KeyboardInterrupt z wieloprocesorowymi pulami Pythona? Oto prosty przykład:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Podczas uruchamiania powyższego kodu KeyboardInterruptzostaje podniesiony po naciśnięciu ^C, ale proces po prostu zawiesza się w tym momencie i muszę go zewnętrznie zabić.

Chcę mieć możliwość naciśnięcia ^Cw dowolnym momencie i spowodowania, aby wszystkie procesy zakończyły się z wdziękiem.

Fragsworth
źródło
Rozwiązałem swój problem za pomocą psutil, rozwiązanie możesz zobaczyć tutaj: stackoverflow.com/questions/32160054/…
Tiago Albineli Motta

Odpowiedzi:

137

To jest błąd Pythona. Podczas oczekiwania na warunek w threading.Condition.wait (), KeyboardInterrupt nigdy nie jest wysyłany. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Wyjątek KeyboardInterrupt nie zostanie dostarczony do momentu powrotu wait () i nigdy nie powróci, więc przerwanie nigdy się nie dzieje. KeyboardInterrupt prawie na pewno powinno przerwać warunek oczekiwania.

Zauważ, że tak się nie dzieje, jeśli określono limit czasu; cond.wait (1) natychmiast otrzyma przerwanie. Tak więc obejściem jest określenie limitu czasu. Aby to zrobić, wymień

    results = pool.map(slowly_square, range(40))

z

    results = pool.map_async(slowly_square, range(40)).get(9999999)

lub podobne.

Glenn Maynard
źródło
3
Czy ten błąd jest gdzieś w oficjalnym trackerze Pythona? Mam problem ze znalezieniem tego hasła, ale prawdopodobnie nie używam najlepszych wyszukiwanych haseł.
Joseph Garvin,
18
Ten błąd został zgłoszony jako [Issue 8296] [1]. [1]: bugs.python.org/issue8296
Andrey Vlasovskikh
1
Oto hack, który naprawia pool.imap () w ten sam sposób, umożliwiając Ctrl-C podczas iteracji po imap. Złap wyjątek i wywołaj pool.terminate (), a program zakończy działanie. gist.github.com/626518
Alexander Ljungberg,
6
To nie całkiem naprawia rzeczy. Czasami otrzymuję oczekiwane zachowanie, gdy naciskam Control + C, innym razem nie. Nie jestem pewien, dlaczego, ale wygląda na to, że być może The KeyboardInterrupt jest odbierany losowo przez jeden z procesów i otrzymuję poprawne zachowanie tylko wtedy, gdy proces nadrzędny jest tym, który go przechwytuje.
Ryan C. Thompson
6
To nie działa dla mnie z Pythonem 3.6.1 w systemie Windows. Dostaję mnóstwo śladów stosu i innych śmieci, kiedy wykonuję Ctrl-C, czyli to samo, co bez takiego obejścia. W rzeczywistości żadne z rozwiązań, które wypróbowałem z tego wątku, nie wydaje się działać ...
szx
56

Z tego, co ostatnio odkryłem, najlepszym rozwiązaniem jest skonfigurowanie procesów roboczych tak, aby całkowicie ignorowały SIGINT i ograniczyły cały kod czyszczący do procesu nadrzędnego. Rozwiązuje to problem zarówno w przypadku bezczynnych, jak i zajętych procesów roboczych i nie wymaga żadnego kodu obsługi błędów w procesach podrzędnych.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Wyjaśnienie i pełny przykładowy kod można znaleźć odpowiednio pod adresem http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ i http://github.com/jreese/multiprocessing-keyboardinterrupt .

John Reese
źródło
4
Cześć John. Twoje rozwiązanie nie daje tego samego, co moje, niestety, skomplikowane rozwiązanie. Ukrywa się za time.sleep(10)głównym procesem. Jeśli miałbyś usunąć to uśpienie lub jeśli czekasz, aż proces spróbuje dołączyć do puli, co musisz zrobić, aby zagwarantować ukończenie zadań, nadal cierpisz na ten sam problem, którego nie robi główny proces nie otrzyma polecenia KeyboardInterrupt podczas oczekiwania na joinoperację sondowania .
bboe
W przypadku, gdy użyłem tego kodu w produkcji, time.sleep () była częścią pętli, która sprawdzałaby stan każdego procesu potomnego, a następnie w razie potrzeby restartowała niektóre procesy z opóźnieniem. Zamiast łączenia (), które czekałoby na zakończenie wszystkich procesów, sprawdzałoby je indywidualnie, upewniając się, że proces główny pozostaje responsywny.
John Reese,
2
Więc było to bardziej zajęte oczekiwanie (może z krótkimi przerwami między sprawdzeniami), które odpytywano o zakończenie procesu za pomocą innej metody, a nie łączenie? Jeśli tak jest, być może lepiej byłoby umieścić ten kod w swoim poście na blogu, ponieważ możesz wtedy zagwarantować, że wszyscy pracownicy ukończyli pracę przed próbą dołączenia.
bboe
4
To nie działa. Sygnał wysyłany jest tylko do dzieci. Rodzic nigdy go nie otrzymuje, więc pool.terminate()nigdy nie zostaje stracony. Ignorowanie sygnału przez dzieci nic nie daje. Odpowiedź @ Glenna rozwiązuje problem.
Cerin
1
Moja wersja jest dostępna pod adresem gist.github.com/admackin/003dd646e5fadee8b8d6 ; nie wywołuje .join()z wyjątkiem przerwania - po prostu ręcznie sprawdza wynik .apply_async()użycia, AsyncResult.ready()aby zobaczyć, czy jest gotowy, co oznacza, że ​​skończyliśmy czysto.
Andy MacKinlay,
29

Z pewnych powodów tylko wyjątki dziedziczone z Exceptionklasy bazowej są obsługiwane normalnie. Jako obejście, można ponownie podnieść KeyboardInterruptjako Exceptionprzykład:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Zwykle otrzymasz następujący wynik:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Więc jeśli trafisz ^C, otrzymasz:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Andrey Vlasovskikh
źródło
2
Wydaje się, że nie jest to pełne rozwiązanie. Jeśli nadejdzie KeyboardInterrupta podczas multiprocessingwykonywania własnej wymiany danych IPC, to try..catchnie zostanie aktywowany (oczywiście).
Andrey Vlasovskikh
Można wymienić raise KeyboardInterruptErrorz return. Musisz tylko upewnić się, że proces potomny zakończy się natychmiast po odebraniu KeyboardInterrupt. Zwracana wartość wydaje się być ignorowana, ale mainnadal otrzymywano KeyboardInterrupt.
Bernhard
8

Zwykle ta prosta struktura działa w przypadku Ctrl- Cw puli:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Jak stwierdzono w kilku podobnych postach:

Przechwytuj przerwanie klawiatury w Pythonie bez try-except

igco
źródło
1
Musiałoby to również zostać wykonane na każdym z procesów roboczych i może się nie powieść, jeśli parametr KeyboardInterrupt zostanie podniesiony podczas inicjowania biblioteki wieloprocesorowej.
MarioVilas
7

Głosowana odpowiedź nie dotyczy podstawowego problemu, ale podobny efekt uboczny.

Jesse Noller, autor wieloprocesorowej biblioteki, wyjaśnia, jak poprawnie radzić sobie z CTRL + C multiprocessing.Poolw starym poście na blogu .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
źródło
Odkryłem, że ProcessPoolExecutor również ma ten sam problem. Jedyne rozwiązanie, jakie udało mi się znaleźć, to zadzwonić os.setpgrp()z przyszłości
portforwardpodcast
1
Jasne, jedyną różnicą jest to, że ProcessPoolExecutornie obsługuje funkcji inicjatora. W systemie Unix można wykorzystać tę forkstrategię, wyłączając sighandler w głównym procesie przed utworzeniem puli, a następnie włączając go ponownie. W żwir , wyciszyć SIGINTsię na procesach potomnych domyślnie. Nie wiem, dlaczego nie robią tego samego z pulami Pythona. Na koniec użytkownik może ponownie ustawić przewodnika SIGINTna wypadek, gdyby chciał zrobić sobie krzywdę.
noxdafox
Wydaje się, że to rozwiązanie zapobiega również przerywaniu głównego procesu przez Ctrl-C.
Paul Price
1
Właśnie przetestowałem Python 3.5 i działa, jakiej wersji Pythona używasz? Jaki system operacyjny?
noxdafox
5

Wydaje się, że istnieją dwie kwestie, które powodują, że wyjątki podczas przetwarzania wieloprocesowego są irytujące. Pierwszą (zauważoną przez Glenna) jest to, że aby uzyskać natychmiastową odpowiedź , musisz użyć map_asynctimeout zamiast map(tzn. Nie kończyć przetwarzania całej listy). Drugi (zauważony przez Andreya) jest taki, że przetwarzanie wieloprocesowe nie przechwytuje wyjątków, które nie dziedziczą po Exception(np SystemExit.). Oto moje rozwiązanie, które rozwiązuje oba te problemy:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Paul Price
źródło
1
Nie zauważyłem żadnego spadku wydajności, ale w moim przypadku functionjest dość długi (setki sekund).
Paul Price
Tak naprawdę to już nie jest, przynajmniej z moich oczu i doświadczenia. Jeśli złapiesz wyjątek klawiatury w poszczególnych procesach potomnych i złapiesz go jeszcze raz w procesie głównym, możesz kontynuować używanie mapi wszystko jest w porządku. @Linux Cli Aikprzedstawił poniżej rozwiązanie, które powoduje takie zachowanie. Używanie map_asyncnie zawsze jest pożądane, jeśli główny wątek zależy od wyników z procesów potomnych.
Code Doggo
4

Odkryłem, że na razie najlepszym rozwiązaniem jest nie korzystanie z funkcji multiprocessing.pool, ale raczej rozwijanie własnej funkcjonalności puli. Podałem przykład demonstrujący błąd z zastosowaniem apply_async, a także przykład pokazujący, jak w ogóle uniknąć korzystania z funkcji puli.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
źródło
Działa jak marzenie. To czyste rozwiązanie, a nie jakiś rodzaj hackowania (/ wydaje mi się). Btw, sztuczka z .get (99999), jak proponowali inni, bardzo szkodzi wydajności.
Walter,
Nie zauważyłem żadnego spadku wydajności wynikającego z użycia limitu czasu, chociaż używam 9999 zamiast 999999. Wyjątkiem jest sytuacja, gdy zostanie zgłoszony wyjątek, który nie dziedziczy z klasy Exception: wtedy musisz poczekać, aż limit czasu zostanie trafienie. Rozwiązaniem jest wyłapanie wszystkich wyjątków (zobacz moje rozwiązanie).
Paul Price
1

Jestem nowicjuszem w Pythonie. Szukałem odpowiedzi wszędzie i natknąłem się na ten i kilka innych blogów i filmów na youtube. Próbowałem skopiować, wkleić powyższy kod autora i odtworzyć go na moim Pythonie 2.7.13 w systemie Windows 7 64-bitowym. Jest blisko tego, co chcę osiągnąć.

Zmusiłem procesy podrzędne, aby ignorowały ControlC i powodowały zakończenie procesu nadrzędnego. Wygląda na to, że omijanie procesu potomnego pozwala mi uniknąć tego problemu.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Część rozpoczynająca się od pool.terminate()nigdy nie wydaje się być wykonywana.

Linux Cli Aik
źródło
Właśnie to rozgryzłem! Szczerze uważam, że to najlepsze rozwiązanie takiego problemu. Przyjęte rozwiązanie wymusza map_asyncna użytkowniku, co mi się nie podoba. W wielu sytuacjach, takich jak moja, główny wątek musi poczekać na zakończenie poszczególnych procesów. To jeden z powodów, dla których mapistnieje!
Code Doggo
1

Możesz spróbować użyć metody apply_async obiektu Pool, na przykład:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Wynik:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Zaletą tej metody jest to, że wyniki przetworzone przed przerwaniem zostaną zwrócone w słowniku wyników:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
źródło
Wspaniały i kompletny przykład
eMTy
-5

O dziwo, wygląda na to, że musisz radzić sobie również KeyboardInterruptz dziećmi. Spodziewałbym się, że będzie działać tak, jak napisano ... spróbuj zmienić slowly_squarena:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

To powinno działać zgodnie z oczekiwaniami.

D.Shawley
źródło
1
Próbowałem tego i tak naprawdę nie kończyło to całego zestawu zadań. Kończy aktualnie uruchomione zadania, ale skrypt nadal przypisuje pozostałe zadania w wywołaniu pool.map, tak jakby wszystko było normalne.
Fragsworth
to jest w porządku, ale możesz stracić orientację w występujących błędach. zwrócenie błędu ze stosem śledzenia może zadziałać, więc proces nadrzędny może stwierdzić, że wystąpił błąd, ale nadal nie kończy pracy natychmiast po wystąpieniu błędu.
mehtunguh