Anulować już wykonywane zadanie za pomocą Selera?

96

Czytałem dokument i szukałem, ale nie mogę znaleźć prostej odpowiedzi:

Czy możesz anulować już wykonywane zadanie? (ponieważ zadanie zostało rozpoczęte, zajmuje trochę czasu, aw połowie musi zostać anulowane)

Znalazłem to z dokumentu w Celery FAQ

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

Ale nie jestem pewien, czy spowoduje to anulowanie zadań w kolejce, czy też zabije działający proces na poziomie roboczym. Dzięki za każde światło, które możesz rzucić!

dcoffey3296
źródło

Odpowiedzi:

185

odwołanie anuluje wykonanie zadania. Jeśli zadanie zostanie odwołane, pracownicy ignorują je i nie wykonują go. Jeśli nie używasz trwałych odwołań, Twoje zadanie może zostać wykonane po ponownym uruchomieniu pracownika.

http://docs.celeryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes

revoke ma opcję zakończenia, która domyślnie ma wartość False . Jeśli chcesz zabić wykonywane zadanie, musisz ustawić terminate na True .

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks

mher
źródło
3
To jest dokładnie wyjaśnienie, którego szukałem, dziękuję!
dcoffey3296
1
Czy to działa w rozproszonym środowisku? Mam na myśli to, że mam pracowników na wielu maszynach, które wykonują zadania. Czy seler śledzi, na której maszynie jest wykonywane zadanie?
ksrini
1
To robi. Komunikacja z pracownikami odbywa się za pośrednictwem brokera.
mher
5
result.revoke (terminate = True) powinno robić to samo, co revoke (task_id, terminate = True)
CamHart
10
Ponadto użycie opcji zakończenia jest „ostatecznością dla administratorów”, zgodnie z ostatnią dokumentacją firmy Celery. Istnieje ryzyko przerwania innego zadania, które zostało niedawno uruchomione dla tego pracownika.
kouk
38

W Celery 3.1 zmieniono API odwoływania zadań .

Zgodnie z często zadawanymi pytaniami dotyczącymi selera , powinieneś użyć result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

lub jeśli masz tylko identyfikator zadania:

>>> from proj.celery import app
>>> app.control.revoke(task_id)
Rockallite
źródło
25

@ Odpowiedź 0x00mh jest prawidłowa, jednak niedawne seler docs powiedzieć, że korzystając z terminateopcji to „ ostatnia deska ratunku dla administratorów ”, ponieważ można przypadkowo zakończenia innego zadania, który rozpoczął wykonywanie w międzyczasie. Prawdopodobnie lepszym rozwiązaniem jest połączenie terminate=Truez signal='SIGUSR1'(co powoduje zgłoszenie wyjątku SoftTimeLimitExceeded w zadaniu).

kouk
źródło
2
U mnie to rozwiązanie bardzo dobrze się sprawdziło. Gdy SoftTimeLimitExceededjest zgłaszane w moim zadaniu, wywoływana jest moja niestandardowa logika czyszczenia (zaimplementowana za pomocą try/ except/ finally). Moim zdaniem jest to o wiele lepsze niż to, co AbortableTaskoferuje ( docs.celeryproject.org/en/latest/reference/ ... ). W tym drugim przypadku potrzebujesz zaplecza wyników bazy danych i musisz ręcznie i wielokrotnie sprawdzać stan trwającego zadania, aby zobaczyć, czy zostało przerwane.
David Schneider
2
Jak to jest lepsze, o ile rozumiem, jeśli jest jakieś inne zadanie odebrane przez proces, i tak zostanie zatrzymane, tylko inny wyjątek zostanie wyrzucony.
marxin
Jeśli używam, worker_prefetch_multiplier = 1ponieważ mam tylko kilka długo działających zadań, zakończenie powinno być w porządku - ponieważ żadne inne zadania nie zostaną wykonane przez zakończenie - czy otrzymałem to poprawnie? @spicyramen
maffe
1

Zobacz następujące opcje zadań: time_limit , soft_time_limit (lub możesz ustawić to dla pracowników). Jeśli chcesz kontrolować nie tylko czas wykonania, a następnie zobaczyć wygasa argument metody apply_async.

simplylizz
źródło