Jak uruchamiać zadania asynchroniczne w aplikacjach Python GObject Introspection

16

Piszę aplikację Python + GObject, która po uruchomieniu musi odczytać nietrywialną ilość danych z dysku. Dane są odczytywane synchronicznie i zakończenie operacji odczytu zajmuje około 10 sekund, w tym czasie ładowanie interfejsu użytkownika jest opóźnione.

Chciałbym uruchomić zadanie asynchronicznie i otrzymać powiadomienie, gdy będzie gotowe, bez blokowania interfejsu użytkownika, mniej więcej tak:

def take_ages():
    read_a_huge_file_from_disk()

def on_finished_long_task():
    print "Finished!"

run_long_task(task=take_ages, callback=on_finished_long_task)
load_the_UI_without_blocking_on_long_task()

W przeszłości używałem GTask do tego typu rzeczy, ale martwię się, że jego kod nie został zmieniony od 3 lat, nie mówiąc już o przeniesieniu go do GObject Introspection. Co najważniejsze, nie jest już dostępny w Ubuntu 12.04. Szukam więc łatwego sposobu na asynchroniczne uruchamianie zadań, albo w standardowy sposób w Pythonie, albo w standardowy sposób GObject / GTK +.

Edycja: oto kod z przykładem tego, co próbuję zrobić. Próbowałem, python-deferjak sugerowano w komentarzach, ale nie udało mi się uruchomić asynchronicznie długiego zadania i pozwolić, aby interfejs użytkownika ładował się bez konieczności oczekiwania na zakończenie. Przeglądaj kod testowy .

Czy istnieje prosty i szeroko stosowany sposób uruchamiania zadań asynchronicznych i otrzymywania powiadomień o ich zakończeniu?

David Planella
źródło
Nie jest to ładny przykład, ale jestem pewien, że tego właśnie szukasz: raw.github.com/gist/1132418/...
RobotHumans
Fajnie, myślę, że twoja async_callfunkcja może być tym, czego potrzebuję. Czy mógłbyś rozwinąć ją nieco i dodać odpowiedź, abym mógł ją zaakceptować i podziękować po przetestowaniu? Dzięki!
David Planella
1
Świetne pytanie, bardzo przydatne! ;-)
Rafał Cieślak

Odpowiedzi:

15

Twój problem jest bardzo częsty, dlatego istnieje mnóstwo rozwiązań (szopy, kolejki z wieloprocesowością lub wątkami, pule pracowników, ...)

Ponieważ jest to tak powszechne, istnieje również wbudowane rozwiązanie Pythona (w wersji 3.2, ale w wersji backported tutaj: http://pypi.python.org/pypi/futures ) o nazwie concurrent.futures. „Futures” są dostępne w wielu językach, dlatego python nazywa je tak samo. Oto typowe wywołania (i tutaj jest twój pełny przykład , jednak część db jest zastąpiona przez sleep, zobacz poniżej dlaczego).

from concurrent import futures
executor = futures.ProcessPoolExecutor(max_workers=1)
#executor = futures.ThreadPoolExecutor(max_workers=1)
future = executor.submit(slow_load)
future.add_done_callback(self.on_complete)

Przejdźmy teraz do twojego problemu, który jest o wiele bardziej skomplikowany niż sugeruje Twój prosty przykład. Zasadniczo masz wątki lub procesy do rozwiązania tego problemu, ale oto dlaczego twój przykład jest tak skomplikowany:

  1. Większość implementacji Pythona ma GIL, co sprawia, że ​​wątki nie wykorzystują w pełni multicores. Więc: nie używaj wątków z pythonem!
  2. Obiekty, które chcesz zwrócić slow_loadz bazy danych, nie są wybierane, co oznacza, że ​​nie można ich po prostu przekazywać między procesami. Zatem: brak przetwarzania wieloprocesorowego z wynikami centrum oprogramowania!
  3. Biblioteka, którą wywołujesz (softwarecenter.db), nie jest bezpieczna dla wątków (wydaje się, że zawiera gtk lub podobne), dlatego wywoływanie tych metod w wątku powoduje dziwne zachowanie (w moim teście wszystko, od „działa” po „zrzut pamięci” po proste rezygnacja bez wyników). Tak więc: brak wątków z centrum oprogramowania.
  4. Każde asynchroniczne wywołanie zwrotne w gtk nie powinno robić nic oprócz shedulowania wywołania zwrotnego, które będzie wywoływane w głównej pętli glib. A więc: nie print, bez zmian stanu GTK, z wyjątkiem dodania oddzwaniania!
  5. Gtk i podobne nie działają z wątkami po wyjęciu z pudełka. Musisz to zrobić threads_init, a jeśli wywołasz metodę gtk lub podobną, musisz ją zabezpieczyć (we wcześniejszych wersjach była to gtk.gdk.threads_enter(), gtk.gdk.threads_leave()patrz np. Gstreamer: http://pygstdocs.berlios.de/pygst-tutorial/playbin. HTML ).

Mogę dać ci następującą sugestię:

  1. Przepisz swoje, slow_loadaby zwrócić wybieralne wyniki i korzystać z kontraktów terminowych.
  2. Przełącz się z centrum oprogramowania na Python-apt lub podobne (prawdopodobnie tego nie lubisz). Ale odkąd jesteś zatrudniony w Canonical, możesz poprosić deweloperów centrum oprogramowania bezpośrednio o dodanie dokumentacji do ich oprogramowania (np. Stwierdzenie, że nie jest ono bezpieczne dla wątków), a nawet lepiej, czyniąc centrum wątków bezpiecznym dla oprogramowania.

Jako uwaga: rozwiązań podanych przez innych ( Gio.io_scheduler_push_job, async_call) zrobić pracę z time.sleep, ale nie z softwarecenter.db. Jest tak, ponieważ wszystko sprowadza się do wątków lub procesów i wątków, które nie działają z gtk i softwarecenter.

xubuntix
źródło
Dzięki! Przyjmuję twoją odpowiedź, ponieważ bardzo szczegółowo pokazuje mi, dlaczego nie jest to wykonalne. Niestety, nie mogę używać oprogramowania, które nie jest dla Ubuntu 12.04 pakowane w mojej aplikacji (jest to dla quantal, choć launchpad.net/ubuntu/+source/python-concurrent.futures ), więc myślę, że utknąłem ze nie jest w stanie aby uruchomić moje zadanie asynchronicznie. Jeśli chodzi o notatkę, aby porozmawiać z programistami Software Center, jestem w tej samej sytuacji, co każdy wolontariusz, aby wprowadzić zmiany w kodzie i dokumentacji lub z nimi porozmawiać :-)
David Planella
GIL jest wydawany podczas IO, więc używanie nici jest całkowicie w porządku. Chociaż nie jest konieczne, jeśli używane jest asynchroniczne we / wy.
jfs
10

Oto kolejna opcja przy użyciu harmonogramu we / wy GIO (nigdy wcześniej nie korzystałem z niego w Pythonie, ale poniższy przykład wydaje się działać dobrze).

from gi.repository import GLib, Gio, GObject
import time

def slow_stuff(job, cancellable, user_data):
    print "Slow!"
    for i in xrange(5):
        print "doing slow stuff..."
        time.sleep(0.5)
    print "finished doing slow stuff!"
    return False # job completed

def main():
    GObject.threads_init()
    print "Starting..."
    Gio.io_scheduler_push_job(slow_stuff, None, GLib.PRIORITY_DEFAULT, None)
    print "It's running async..."
    GLib.idle_add(ui_stuff)
    GLib.MainLoop().run()

def ui_stuff():
    print "This is the UI doing stuff..."
    time.sleep(1)
    return True

if __name__ == '__main__':
    main()
Zygfryd Gevatter
źródło
Zobacz także GIO.io_scheduler_job_send_to_mainloop (), jeśli chcesz uruchomić coś w głównym wątku po zakończeniu slow_stuff.
Siegfried Gevatter
Dzięki Sigfried za odpowiedź i przykład. Niestety wydaje się, że przy moim bieżącym zadaniu nie mam szansy użyć interfejsu API Gio, aby uruchomić go asynchronicznie.
David Planella
Było to bardzo przydatne, ale o ile mogę stwierdzić, Gio.io_scheduler_job_send_to_mainloop nie istnieje w Pythonie :(
sil
2

Możesz także użyć GLib.idle_add (oddzwanianie), aby wywołać długo działające zadanie, gdy GLib Mainloop zakończy wszystkie zdarzenia o wyższym priorytecie (które moim zdaniem obejmują zbudowanie interfejsu użytkownika).

mhall119
źródło
Dzięki Mike. Tak, to zdecydowanie pomogłoby w uruchomieniu zadania, gdy interfejs użytkownika jest gotowy. Ale z drugiej strony rozumiem, że kiedy callbackzostanie wywołany, będzie to zrobione synchronicznie, blokując w ten sposób interfejs użytkownika, prawda?
David Planella
Idle_add nie działa w ten sposób. Wykonywanie blokujących połączeń w idle_add jest nadal złym posunięciem i zapobiegnie aktualizacjom interfejsu użytkownika. Nawet asynchroniczny interfejs API może nadal blokować, a jedynym sposobem uniknięcia blokowania interfejsu użytkownika i innych zadań jest wykonanie go w tle.
dobey
Idealnie byłoby, gdybyś podzielił swoje powolne zadanie na części, abyś mógł uruchomić trochę w bezczynnym wywołaniu zwrotnym, powrócić (i pozwolić innym funkcjom, takim jak wywołania zwrotne interfejsu użytkownika), kontynuować pracę po ponownym wywołaniu wywołania zwrotnego i tak dalej na.
Siegfried Gevatter
Gotcha z idle_addtym, że wartość zwracana oddzwonienia ma znaczenie. Jeśli to prawda, zostanie ponownie wywołane.
Flimm
2

Użyj introspekowanego Giointerfejsu API, aby odczytać plik za pomocą jego metod asynchronicznych, a podczas wykonywania pierwszego wywołania wykonaj go jako limit czasu, w GLib.timeout_add_seconds(3, call_the_gio_stuff)którym call_the_gio_stufffunkcja jest zwracana False.

W tym przypadku konieczne jest przekroczenie limitu czasu (może być wymagana inna liczba sekund), ponieważ chociaż wywołania asynchroniczne Gio są asynchroniczne, nie są one blokujące, co oznacza, że ​​duża aktywność dysku podczas odczytu dużego pliku lub dużego liczba plików, może spowodować zablokowanie interfejsu użytkownika, ponieważ interfejs użytkownika i operacje we / wy są nadal w tym samym (głównym) wątku.

Jeśli chcesz napisać własne funkcje, które mają być asynchroniczne, i zintegrować je z główną pętlą, używając interfejsów API we / wy pliku Pythona, musisz napisać kod jako GObject lub przekazać wywołania zwrotne lub użyć, python-deferaby ci pomóc Zrób to. Ale najlepiej jest używać Gio tutaj, ponieważ może przynieść wiele fajnych funkcji, szczególnie jeśli robisz otwieranie / zapisywanie plików w UX.

dobey
źródło
Dzięki @dobey. Właściwie nie czytam bezpośrednio pliku z dysku, prawdopodobnie powinienem to wyjaśnić w oryginalnym poście. Długotrwałym zadaniem, które uruchamiam, jest odczytywanie bazy danych Software Center zgodnie z odpowiedzią na askubuntu.com/questions/139032/... , więc nie jestem pewien, czy mogę korzystać z Giointerfejsu API. Zastanawiałem się, czy istnieje sposób na asynchroniczne uruchamianie dowolnego ogólnego długowiecznego zadania w taki sam sposób, jak robił to GTask.
David Planella
Nie wiem, czym dokładnie jest GTask, ale jeśli masz na myśli gtask.sourceforge.net , to nie sądzę, że powinieneś tego używać. Jeśli to coś innego, to nie wiem, co to jest. Ale wygląda na to, że będziesz musiał wybrać drugą trasę, o której wspomniałem, i zaimplementować asynchroniczny interfejs API, aby owinąć ten kod lub po prostu zrobić to wszystko w wątku.
dobey
W pytaniu jest link do tego. GTask is (was): chergert.github.com/gtask
David Planella
1
Ach, to wygląda bardzo podobnie do API dostarczanego przez python-defer (i twisted's deferred API). Być może powinieneś rozważyć użycie Python-Defer?
dobey
1
Nadal musisz opóźnić to wywołanie, aż do wystąpienia głównych zdarzeń priorytetowych, na przykład za pomocą GLib.idle_add (). W ten sposób: pastebin.ubuntu.com/1011660
dobey
1

Myślę, że należy zauważyć, że jest to skomplikowany sposób robienia tego, co sugerował @mhall.

Zasadniczo masz uruchom to, a następnie uruchom funkcję async_call.

Jeśli chcesz zobaczyć, jak to działa, możesz grać z wyłącznikiem czasowym i klikać przycisk. Jest to w zasadzie to samo co odpowiedź @ mhall, tyle że przykładowy kod.

Na podstawie tego, co nie jest moją pracą.

import threading
import time
from gi.repository import Gtk, GObject



# calls f on another thread
def async_call(f, on_done):
    if not on_done:
        on_done = lambda r, e: None

    def do_call():
        result = None
        error = None

        try:
            result = f()
        except Exception, err:
            error = err

        GObject.idle_add(lambda: on_done(result, error))
    thread = threading.Thread(target = do_call)
    thread.start()

class SlowLoad(Gtk.Window):

    def __init__(self):
        Gtk.Window.__init__(self, title="Hello World")
        GObject.threads_init()        

        self.connect("delete-event", Gtk.main_quit)

        self.button = Gtk.Button(label="Click Here")
        self.button.connect("clicked", self.on_button_clicked)
        self.add(self.button)

        self.file_contents = 'Slow load pending'

        async_call(self.slow_load, self.slow_complete)

    def on_button_clicked(self, widget):
        print self.file_contents

    def slow_complete(self, results, errors):
        '''
        '''
        self.file_contents = results
        self.button.set_label(self.file_contents)
        self.button.show_all()

    def slow_load(self):
        '''
        '''
        time.sleep(5)
        self.file_contents = "Slow load in progress..."
        time.sleep(5)
        return 'Slow load complete'



if __name__ == '__main__':
    win = SlowLoad()
    win.show_all()
    #time.sleep(10)
    Gtk.main()

Dodatkowa uwaga: musisz pozwolić drugiemu wątkowi skończyć się, zanim zostanie poprawnie zakończony lub sprawdzić, czy plik.lock w twoim wątku potomnym.

Edytuj, aby skomentować komentarz:
Początkowo zapomniałem GObject.threads_init(). Najwyraźniej kiedy przycisk zadziałał, zainicjował on dla mnie wątki. To zamaskowało dla mnie błąd.

Ogólnie przepływ polega na utworzeniu okna w pamięci, natychmiast uruchom drugi wątek, gdy wątek się zakończy zaktualizuj przycisk. Dodałem dodatkowy sen, zanim zadzwoniłem do Gtk.main, aby sprawdzić, czy pełna aktualizacja MOGŁA działać, zanim okno zostanie jeszcze narysowane. Skomentowałem to również, aby sprawdzić, czy uruchomienie wątku wcale nie utrudnia rysowania okien.

RobotHumans
źródło
1
Dzięki. Nie jestem pewien, czy mogę to zrobić. Po pierwsze, spodziewałbym slow_loadsię, że zostanie wykonany wkrótce po uruchomieniu interfejsu użytkownika, ale wydaje się, że nie jest wywoływany, chyba że przycisk zostanie kliknięty, co trochę mnie dezorientuje, ponieważ myślałem, że celem tego przycisku było jedynie zapewnienie wizualnego wskazania stanu zadania.
David Planella
Przepraszam, przegapiłem jedną linię. To zrobiło to. Zapomniałem powiedzieć GObject, aby przygotował się na wątki.
RobotHumans
Ale dzwonisz do głównej pętli z wątku, co może powodować problemy, chociaż mogą nie być łatwo ujawnione w twoim trywialnym przykładzie, który nie wykonuje żadnej prawdziwej pracy.
dobey
Ważny punkt, ale nie sądziłem, że trywialny przykład zasługiwał na wysłanie powiadomienia przez DBus (co, jak sądzę, powinna robić nietrywialna aplikacja)
RobotHumans
Hm, uruchamianie async_callw tym przykładzie działa dla mnie, ale powoduje chaos, gdy portuję go do mojej aplikacji i dodam prawdziwą slow_loadfunkcję, którą mam.
David Planella