Jak sprawdzić, czy zadanie działa w selerze (konkretnie używam selera-django)?
Przeczytałem dokumentację i wyszukałem w Google, ale nie widzę połączenia takiego jak:
my_example_task.state() == RUNNING
Moim przypadkiem jest to, że mam zewnętrzną usługę (java) do transkodowania. Kiedy wysyłam dokument do transkodowania, chcę sprawdzić, czy zadanie, które uruchamia tę usługę, jest uruchomione, a jeśli nie, to (ponownie) je uruchomić.
Używam aktualnych stabilnych wersji - chyba 2.4.
python
web-services
celery
django-celery
Marcin
źródło
źródło
x
?async_result
. Jeśli masz już instancję, możesz zacząć. Ale co się stanie, jeśli masz tylko identyfikator zadania i potrzebujesz utworzyćasync_result
instancję, aby móc wywołaćasync_result.get()
? To jest instancjaAsyncResult
klasy, ale nie możesz użyć surowej klasycelery.result.AsyncResult
, musisz pobrać klasę z funkcji opakowanej przezapp.task()
. W takim razie zrobiłbyś toasync_result = run_instance.AsyncResult('task-id')
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task().
- Myślę, że tak właśnie miało być używane. Przeczytaj kod: github.com/celery/celery/blob/ ...Utworzenie
AsyncResult
obiektu z identyfikatora zadania jest sposobem zalecanym w FAQ, aby uzyskać status zadania, gdy jedyne, co masz, to identyfikator zadania.Jednak od wersji Selery 3.x istnieją poważne zastrzeżenia, które mogą ugryźć ludzi, jeśli nie zwrócą na nich uwagi. To naprawdę zależy od konkretnego scenariusza użycia.
Domyślnie Celery nie rejestruje stanu „uruchomiony”.
Aby Celery mógł zarejestrować, że zadanie jest uruchomione, musisz ustawić
task_track_started
naTrue
. Oto proste zadanie, które to sprawdza:@app.task(bind=True) def test(self): print self.AsyncResult(self.request.id).state
Kiedy
task_track_started
jestFalse
, co jest wartością domyślną, stan jest wyświetlany,PENDING
nawet jeśli zadanie zostało uruchomione. Jeśli ustawisztask_track_started
naTrue
, stan będzieSTARTED
.Stan
PENDING
oznacza „nie wiem”.Ze
AsyncResult
stanemPENDING
nie znaczy nic więcej niż to, że Seler nie zna statusu zadania. Może to mieć wiele przyczyn.Po pierwsze,
AsyncResult
można je skonstruować z nieprawidłowymi identyfikatorami zadań. Takie „zadania” zostaną uznane przez Selera za oczekujące:>>> task.AsyncResult("invalid").status 'PENDING'
Ok, więc nikt nie będzie podawał oczywiście nieprawidłowych identyfikatorów
AsyncResult
. W porządku, ale ma to również wpływ naAsyncResult
uwzględnienie zadania, które zostało pomyślnie wykonane, ale o którym Seler zapomniałPENDING
. Ponownie, w niektórych scenariuszach użycia może to stanowić problem. Część problemu zależy od tego, w jaki sposób Seler jest skonfigurowany do przechowywania wyników zadań, ponieważ zależy to od dostępności „nagrobków” w zapleczu wyników. ( „Nagrobki” jest długotrwałe stosowanie w dokumentacji naciowego na kawałki danych, rekord w jaki sposób zadanie zakończone.) UżywanieAsyncResult
nie będzie działać, jeśli w ogóletask_ignore_result
jestTrue
. Bardziej irytującym problemem jest to, że Seler domyślnie traci ważność na nagrobkach. Plikresult_expires
ustawienie domyślne to 24 godziny. Więc jeśli uruchomisz zadanie i zapiszesz identyfikator w pamięci długoterminowej, a później 24 godziny później, utworzyszAsyncResult
z nim, status będziePENDING
.Wszystkie „rzeczywiste zadania” zaczynają się w
PENDING
stanie. WięcPENDING
podjęcie zadania może oznaczać, że zadanie zostało poproszone, ale nigdy nie posunęło się dalej (z jakiegokolwiek powodu). Albo może oznaczać, że zadanie zostało wykonane, ale Celery zapomniał o swoim stanie.Auć!
AsyncResult
nie zadziała dla mnie. Co jeszcze mogę zrobić?Wolę śledzić cele niż same zadania . Zachowuję pewne informacje o zadaniach, ale jest to naprawdę drugorzędne w stosunku do śledzenia celów. Cele są przechowywane w magazynie niezależnym od selera. Kiedy żądanie wymaga wykonania obliczeń zależy od osiągnięcia jakiegoś celu, sprawdza, czy cel został już osiągnięty, jeśli tak, używa tego celu zapisanego w pamięci podręcznej, w przeciwnym razie rozpoczyna zadanie, które będzie miało wpływ na cel i wysyła do klient, który wysłał żądanie HTTP, otrzymał odpowiedź wskazującą, że powinien czekać na wynik.
Nazwy zmiennych i hiperłącza powyżej dotyczą programu Celery 4.x. W 3.x odpowiednie zmienne i linki są:
CELERY_TRACK_STARTED
,CELERY_IGNORE_RESULT
,CELERY_TASK_RESULT_EXPIRES
.źródło
Każdy
Task
obiekt ma.request
właściwość, która zawieraAsyncRequest
obiekt. W związku z tym następujący wiersz przedstawia stan zadaniatask
:źródło
state
właściwość, nadal wróciłoPENDING
. Wydaje się, że nie jest to niezawodny sposób śledzenia stanu zadań selera z terminala. Dodatkowo mam uruchomiony kwiat selera (narzędzie do monitorowania selera), z jakiegoś powodu nie wyświetlało zadań, których szukałem na liście zadań, które wykonał. Być może będę musiał zajrzeć do ustawień Kwiatów, aby sprawdzić, czy jest coś, co mówi, że pojawia się tylko do niektórych godzin w przeszłości.Możesz także tworzyć niestandardowe stany i aktualizować wykonanie zadania wartościowego. Ten przykład pochodzi z dokumentów:
@app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state='PROGRESS', meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
źródło
Stare pytanie, ale ostatnio napotkałem ten problem.
Jeśli próbujesz uzyskać task_id, możesz to zrobić w następujący sposób:
import celery from celery_app import add from celery import uuid task_id = uuid() result = add.apply_async((2, 2), task_id=task_id)
Teraz wiesz dokładnie, czym jest task_id i możesz go teraz użyć do uzyskania wyniku AsyncResult:
# grab the AsyncResult result = celery.result.AsyncResult(task_id) # print the task id print result.task_id 09dad9cf-c9fa-4aee-933f-ff54dae39bdf # print the AsyncResult's status print result.status SUCCESS # print the result returned print result.result 4
źródło
apply_async
. Obiekt zwrócony przezapply_async
toAsyncResult
obiekt, który ma identyfikator zadania, które wygenerował Celery.task_id
wymagają samodzielnego wygenerowania identyfikatora zadania. W swoim komentarzu wyobraziłeś sobie powód, który wykracza poza „jak mogę sprawdzić stan zadania” i „Jeśli próbujesz uzyskać identyfikator zadania ...` Świetnie, jeśli masz taką potrzebę, ale tak nie jest tutaj. (Poza tym użycieuuid()
do wygenerowania identyfikatora zadania nie robi absolutnie nic poza tym, co domyślnie robi Celery.)Po prostu użyj tego API z FAQ selera
To działa dobrze.
źródło
Odpowiedź z 2020 r .:
#### tasks.py @celery.task() def mytask(arg1): print(arg1) #### blueprint.py @bp.route("/args/arg1=<arg1>") def sleeper(arg1): process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1) state = process.state return f"Thanks for your patience, your job {process.task_id} \ is being processed. Status {state}"
źródło
Próbować:
task.AsyncResult(task.request.id).state
zapewni to status zadania selera. Jeśli zadanie selera jest już w stanie FAILURE , zgłosi wyjątek:
raised unexpected: KeyError('exc_type',)
źródło
w przypadku prostych zadań do monitorowania możemy użyć http://flower.readthedocs.io/en/latest/screenshots.html i http://policystat.github.io/jobtastic/ .
a w przypadku skomplikowanych zadań powiedzmy zadanie, które obejmuje wiele innych modułów. Zalecamy ręczne rejestrowanie postępu i komunikatu w określonej jednostce zadaniowej.
źródło
Znalazłem pomocne informacje w
Przewodnik dla pracowników projektu Seler
W moim przypadku sprawdzam, czy działa Seler.
inspect_workers = task.app.control.inspect() if inspect_workers.registered() is None: state = 'FAILURE' else: state = str(task.state)
Możesz bawić się inspekcją, aby zaspokoić swoje potrzeby.
źródło
vi my_celery_apps / app1.py
vi zadania / task1.py
from my_celery_apps.app1 import app app.AsyncResult(taskid) try: if task.state.lower() != "success": return except: """ do something """
źródło
Oprócz powyższego Podejście programowe Status zadania kwiatowego można łatwo zobaczyć.
Monitorowanie w czasie rzeczywistym przy użyciu zdarzeń selera. Flower to internetowe narzędzie do monitorowania i administrowania klastrami selera.
Oficjalny dokument: kwiat - narzędzie do monitorowania selera
Instalacja:
Stosowanie:
http://localhost:5555
źródło
res = method.delay() print(f"id={res.id}, state={res.state}, status={res.status} ") print(res.get())
źródło