Jak sprawdzić status zadania w Selerze?

95

Jak sprawdzić, czy zadanie działa w selerze (konkretnie używam selera-django)?

Przeczytałem dokumentację i wyszukałem w Google, ale nie widzę połączenia takiego jak:

my_example_task.state() == RUNNING

Moim przypadkiem jest to, że mam zewnętrzną usługę (java) do transkodowania. Kiedy wysyłam dokument do transkodowania, chcę sprawdzić, czy zadanie, które uruchamia tę usługę, jest uruchomione, a jeśli nie, to (ponownie) je uruchomić.

Używam aktualnych stabilnych wersji - chyba 2.4.

Marcin
źródło

Odpowiedzi:

98

Zwróć task_id (który jest podany z .delay ()) i zapytaj następnie instancję selera o stan:

x = method.delay(1,2)
print x.task_id

Pytając, uzyskaj nowy wynik AsyncResult, używając tego task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
źródło
11
Dzięki, ale co jeśli nie mam dostępu do x?
Marcin
4
Gdzie umieszczasz swoje prace w selerze? Tam musisz zwrócić task_id, aby śledzić zadanie w przyszłości.
Gregor
W przeciwieństwie do @ Marcin, ta odpowiedź nie używa statycznej metody Task.AsyncResult () jako fabryki AsyncResult, która pożytecznie ponownie wykorzystuje konfigurację zaplecza, w przeciwnym razie podczas próby uzyskania wyniku zostanie zgłoszony błąd.
ArnauOrriols
2
@Chris Kontrowersje związane z kodem @gregor dotyczą tworzenia instancji async_result. Jeśli masz już instancję, możesz zacząć. Ale co się stanie, jeśli masz tylko identyfikator zadania i potrzebujesz utworzyć async_resultinstancję, aby móc wywołać async_result.get()? To jest instancja AsyncResultklasy, ale nie możesz użyć surowej klasy celery.result.AsyncResult, musisz pobrać klasę z funkcji opakowanej przez app.task(). W takim razie zrobiłbyś toasync_result = run_instance.AsyncResult('task-id')
ArnauOrriols
1
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - Myślę, że tak właśnie miało być używane. Przeczytaj kod: github.com/celery/celery/blob/ ...
nevelis
74

Utworzenie AsyncResultobiektu z identyfikatora zadania jest sposobem zalecanym w FAQ, aby uzyskać status zadania, gdy jedyne, co masz, to identyfikator zadania.

Jednak od wersji Selery 3.x istnieją poważne zastrzeżenia, które mogą ugryźć ludzi, jeśli nie zwrócą na nich uwagi. To naprawdę zależy od konkretnego scenariusza użycia.

Domyślnie Celery nie rejestruje stanu „uruchomiony”.

Aby Celery mógł zarejestrować, że zadanie jest uruchomione, musisz ustawić task_track_startedna True. Oto proste zadanie, które to sprawdza:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Kiedy task_track_startedjest False, co jest wartością domyślną, stan jest wyświetlany, PENDINGnawet jeśli zadanie zostało uruchomione. Jeśli ustawisz task_track_startedna True, stan będzie STARTED.

Stan PENDINGoznacza „nie wiem”.

Ze AsyncResultstanem PENDINGnie znaczy nic więcej niż to, że Seler nie zna statusu zadania. Może to mieć wiele przyczyn.

Po pierwsze, AsyncResultmożna je skonstruować z nieprawidłowymi identyfikatorami zadań. Takie „zadania” zostaną uznane przez Selera za oczekujące:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, więc nikt nie będzie podawał oczywiście nieprawidłowych identyfikatorów AsyncResult. W porządku, ale ma to również wpływ na AsyncResultuwzględnienie zadania, które zostało pomyślnie wykonane, ale o którym Seler zapomniał PENDING. Ponownie, w niektórych scenariuszach użycia może to stanowić problem. Część problemu zależy od tego, w jaki sposób Seler jest skonfigurowany do przechowywania wyników zadań, ponieważ zależy to od dostępności „nagrobków” w zapleczu wyników. ( „Nagrobki” jest długotrwałe stosowanie w dokumentacji naciowego na kawałki danych, rekord w jaki sposób zadanie zakończone.) Używanie AsyncResultnie będzie działać, jeśli w ogóle task_ignore_resultjest True. Bardziej irytującym problemem jest to, że Seler domyślnie traci ważność na nagrobkach. Plikresult_expiresustawienie domyślne to 24 godziny. Więc jeśli uruchomisz zadanie i zapiszesz identyfikator w pamięci długoterminowej, a później 24 godziny później, utworzysz AsyncResultz nim, status będzie PENDING.

Wszystkie „rzeczywiste zadania” zaczynają się w PENDINGstanie. Więc PENDINGpodjęcie zadania może oznaczać, że zadanie zostało poproszone, ale nigdy nie posunęło się dalej (z jakiegokolwiek powodu). Albo może oznaczać, że zadanie zostało wykonane, ale Celery zapomniał o swoim stanie.

Auć! AsyncResultnie zadziała dla mnie. Co jeszcze mogę zrobić?

Wolę śledzić cele niż same zadania . Zachowuję pewne informacje o zadaniach, ale jest to naprawdę drugorzędne w stosunku do śledzenia celów. Cele są przechowywane w magazynie niezależnym od selera. Kiedy żądanie wymaga wykonania obliczeń zależy od osiągnięcia jakiegoś celu, sprawdza, czy cel został już osiągnięty, jeśli tak, używa tego celu zapisanego w pamięci podręcznej, w przeciwnym razie rozpoczyna zadanie, które będzie miało wpływ na cel i wysyła do klient, który wysłał żądanie HTTP, otrzymał odpowiedź wskazującą, że powinien czekać na wynik.


Nazwy zmiennych i hiperłącza powyżej dotyczą programu Celery 4.x. W 3.x odpowiednie zmienne i linki są: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Louis
źródło
Więc jeśli chcę później sprawdzić wynik (może nawet w innym procesie), lepiej będzie, jeśli mam własną implementację? Zapisujesz wynik ręcznie w bazie danych?
Franklin Yu
Tak, oddzieliłbym śledzenie „celu” od śledzenia „zadań”. Napisałem „wykonaj obliczenia, które zależą od jakiegoś celu”. Zwykle „cel” jest również obliczeniem. Na przykład, jeśli chcę pokazać użytkownikowi artykuł X, muszę przekonwertować go z XML na HTML, ale wcześniej musiałem rozwiązać wszystkie odniesienia bibliograficzne. (X jest jak artykuł w czasopiśmie). Sprawdzam, czy cel „artykuł X z rozwiązanymi wszystkimi odniesieniami bibliograficznymi” istnieje i używam go, zamiast próbować sprawdzić status zadania selera, który obliczyłby mój cel.
Louis
A informacja „artykuł X z rozwiązanymi wszystkimi odniesieniami bibliograficznymi” jest przechowywana w pamięci podręcznej i przechowywana w bazie danych eXist-db.
Louis
63

Każdy Taskobiekt ma .requestwłaściwość, która zawiera AsyncRequestobiekt. W związku z tym następujący wiersz przedstawia stan zadania task:

task.AsyncResult(task.request.id).state
Marcin
źródło
2
Czy istnieje sposób na przechowywanie procentowego postępu zadania?
patrick
5
Kiedy to robię, otrzymuję trwale OCZEKUJĄCY wynik AsyncResult, nawet jeśli zaczekam wystarczająco długo na zakończenie zadania. Czy istnieje sposób na to, aby zobaczyć zmiany stanu? Wydaje mi się, że mój backend jest skonfigurowany i próbowałem ustawić CELERY_TRACK_STARTED = True bezskutecznie.
dstromberg
1
@dstromberg Niestety minęły 4 lata, odkąd był to dla mnie problem, więc nie mogę pomóc. Prawie na pewno musisz skonfigurować seler do śledzenia statusu.
Marcin
Dodając dalej do obserwacji @ dstromberg, tylko dla potwierdzenia, wybrałem zadanie selera, o którym wiedziałem na pewno, że się powiodło i sprawdziłem jego statewłaściwość, nadal wróciło PENDING. Wydaje się, że nie jest to niezawodny sposób śledzenia stanu zadań selera z terminala. Dodatkowo mam uruchomiony kwiat selera (narzędzie do monitorowania selera), z jakiegoś powodu nie wyświetlało zadań, których szukałem na liście zadań, które wykonał. Być może będę musiał zajrzeć do ustawień Kwiatów, aby sprawdzić, czy jest coś, co mówi, że pojawia się tylko do niektórych godzin w przeszłości.
Głęboki
16

Możesz także tworzyć niestandardowe stany i aktualizować wykonanie zadania wartościowego. Ten przykład pochodzi z dokumentów:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
źródło
11

Stare pytanie, ale ostatnio napotkałem ten problem.

Jeśli próbujesz uzyskać task_id, możesz to zrobić w następujący sposób:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Teraz wiesz dokładnie, czym jest task_id i możesz go teraz użyć do uzyskania wyniku AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
źródło
4
Nie ma absolutnie potrzeby tworzenia własnego identyfikatora zadania i przekazywania go apply_async. Obiekt zwrócony przez apply_async to AsyncResultobiekt, który ma identyfikator zadania, które wygenerował Celery.
Louis,
1
Popraw mnie, jeśli się mylę, ale czy czasami nie jest przydatne generowanie UUID na podstawie niektórych danych wejściowych, tak aby wszystkie wywołania otrzymujące te same dane wejściowe otrzymały ten sam UUID? IOW, może czasami warto określić identyfikator zadania.
dstromberg
1
@dstromberg Pytanie zadane przez OP brzmi „jak sprawdzić stan zadania”, a odpowiedź tutaj brzmi „Jeśli próbujesz uzyskać identyfikator zadania ...”. Ani sprawdzanie statusu zadania, ani otrzymywanie nie task_idwymagają samodzielnego wygenerowania identyfikatora zadania. W swoim komentarzu wyobraziłeś sobie powód, który wykracza poza „jak mogę sprawdzić stan zadania” i „Jeśli próbujesz uzyskać identyfikator zadania ...` Świetnie, jeśli masz taką potrzebę, ale tak nie jest tutaj. (Poza tym użycie uuid()do wygenerowania identyfikatora zadania nie robi absolutnie nic poza tym, co domyślnie robi Celery.)
Louis
Zgadzam się, że OP nie pytał konkretnie, jak uzyskać przewidywalne identyfikatory zadań, ale odpowiedź na pytanie OP brzmi obecnie „śledź identyfikator zadania i wykonaj x”. Wydaje mi się, że śledzenie identyfikatora zadania jest niepraktyczne w wielu różnych sytuacjach, więc odpowiedź może w rzeczywistości nie być satysfakcjonująca. Ta odpowiedź pomaga mi rozwiązać mój przypadek użycia (jeśli mogę pokonać inne odnotowane ograniczenia) z tego samego powodu, dla którego @dstromberg wskazuje - czy było to zmotywowane z tego powodu.
claytond
7

Po prostu użyj tego API z FAQ selera

result = app.AsyncResult(task_id)

To działa dobrze.

David Ding
źródło
1

Odpowiedź z 2020 r .:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Adrian Garcia Moreno
źródło
0

Próbować:

task.AsyncResult(task.request.id).state

zapewni to status zadania selera. Jeśli zadanie selera jest już w stanie FAILURE , zgłosi wyjątek:

raised unexpected: KeyError('exc_type',)

gogasca
źródło
0

Znalazłem pomocne informacje w

Przewodnik dla pracowników projektu Seler

W moim przypadku sprawdzam, czy działa Seler.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Możesz bawić się inspekcją, aby zaspokoić swoje potrzeby.

zerocog
źródło
0
  • Najpierw , w aplikacji selera :

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • a następnie przejdź do pliku zadania , importuj aplikację z modułu aplikacji selera.

vi zadania / task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

Ty ZhengChuan
źródło
-1

Oprócz powyższego Podejście programowe Status zadania kwiatowego można łatwo zobaczyć.

Monitorowanie w czasie rzeczywistym przy użyciu zdarzeń selera. Flower to internetowe narzędzie do monitorowania i administrowania klastrami selera.

  1. Postęp i historia zadań
  2. Możliwość wyświetlania szczegółów zadania (argumenty, czas rozpoczęcia, czas działania i więcej)
  3. Wykresy i statystyki

Oficjalny dokument: kwiat - narzędzie do monitorowania selera

Instalacja:

$ pip install flower

Stosowanie:

http://localhost:5555
Roshan Bagdiya
źródło
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
Saurabh I
źródło
2
Nie wysyłaj tylko kodu jako odpowiedzi, ale także wyjaśnij, co robi twój kod i jak rozwiązuje problem pytania. Odpowiedzi z wyjaśnieniem są zwykle bardziej pomocne i lepszej jakości oraz częściej przyciągają głosy za.
Mark Rotteveel