Usuwanie wszystkich oczekujących zadań w selerze / rabbitmq

199

Jak mogę usunąć wszystkie oczekujące zadania, nie znając ich task_iddla każdego zadania?

nabizan
źródło

Odpowiedzi:

296

Z dokumentów :

$ celery -A proj purge

lub

from proj.celery import app
app.control.purge()

(EDYCJA: Zaktualizowano aktualną metodą.)

Philip Southam
źródło
56
Lub, od Django, dla selera 3.0+: manage.py celery purge( celeryctljest już nieaktualny i zniknie w wersji 3.1).
Henrik Heimbuerger
3
Znalazłem tę odpowiedź, szukając sposobu na wykonanie tego z backendem redis. Najlepszą metodą, jaką znalazłem, była ta, redis-cli KEYS "celery*" | xargs redis-cli DELktóra działała dla mnie. Spowoduje to wymazanie wszystkich zadań przechowywanych w backendzie redis, którego używasz.
Melignus
1
Jak mogę to zrobić w Selerze 3.0?
luistm
2
Dla mnie było to po prostu celery purge(wewnątrz odpowiedniej wirtualnej env).
Ups
Dla Celery 4.0+ w połączeniu z Django jest to ponownie to polecenie, gdzie argumentem -Ajest aplikacja Django, w której się celery.pyznajduje.
gitaarik
120

Dla selera 3.0+:

$ celery purge

Aby wyczyścić określoną kolejkę:

$ celery -Q queue_name purge
ToonAlfrink
źródło
9
Jeśli pojawią się błędy połączenia, upewnij się, że podałeś aplikację, np celery -A proj purge.
Kamil Sindi,
25

W przypadku selera 2.xi 3.x:

Na przykład przy użyciu pracownika z parametrem -Q do definiowania kolejek

celery worker -Q queue1,queue2,queue3

wtedy celery purgenie będzie działać, ponieważ nie można przekazać do niego parametrów kolejki. Usunie tylko domyślną kolejkę. Rozwiązaniem jest uruchomienie pracowników z --purgeparametrem takim jak ten:

celery worker -Q queue1,queue2,queue3 --purge

Spowoduje to jednak uruchomienie pracownika.

Inną opcją jest użycie komendy amqp selera

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
smido
źródło
Tak, dotyczy to starszych (2.x i może 3.x) wersji selera. Nie mogę edytować odpowiedzi
smido
9

Odkryłem, że celery purgeto nie działa w mojej bardziej złożonej konfiguracji selera. Korzystam z wielu nazwanych kolejek do różnych celów:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

Pierwsza kolumna to nazwa kolejki, druga to liczba wiadomości oczekujących w kolejce, a trzecia to liczba detektorów dla tej kolejki. Kolejki to:

  • seler - Kolejka do standardowych, idempotentnych zadań selera
  • apns - Kolejka zadań Apple Push Notification Service, niezupełnie idempotentna
  • analityka - kolejka do długotrwałej analizy nocnej
  • * .pidbox - Kolejka poleceń roboczych, takich jak zamknięcie i reset, jeden na pracownika (2 pracowników selera, jednego pracownika apns, jednego pracownika analizy)
  • bcast. * - Kolejki rozgłoszeniowe, do wysyłania wiadomości do wszystkich pracowników nasłuchujących na kolejce (zamiast tylko pierwszych, którzy ją przechwytują)
  • celeryev. * - Kolejki zdarzeń selera do raportowania analizy zadań

Zadanie analityczne to zadania typu brute force, które świetnie sprawdzały się w przypadku małych zestawów danych, ale ich przetwarzanie zajmuje teraz ponad 24 godziny. Czasami coś pójdzie nie tak i utknie w oczekiwaniu na bazę danych. Musi zostać przepisany, ale do tego czasu, kiedy się zacina, zabijam zadanie, opróżniam kolejkę i próbuję ponownie. „Zakleszczenie” wykrywam, sprawdzając liczbę komunikatów w kolejce analitycznej, która powinna wynosić 0 (gotowe analizy) lub 1 (oczekiwanie na zakończenie ostatniej nocy analizy). 2 lub wyższy jest zły i dostaję e-mail.

celery purge oferuje usunięcie zadań z jednej z kolejek rozgłoszeniowych i nie widzę opcji wyboru innej nazwanej kolejki.

Oto mój proces:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
jwhitlock
źródło
Ale nie odpowiedź, prawda? Jednak bardzo pouczające!
rano
4
celeryctl purgenie działał z nazwanymi kolejkami. python manage.py celery amqp queue.purge <queue_name>zrobił. Myślę, że kontekst jest przydatny dla osób ze złożonymi ustawieniami, aby mogli dowiedzieć się, co powinni zrobić, jeśli się celeryctl purgenie powiedzie.
jwhitlock
Nie mogę znaleźć manage.pyw moim Selerze 3.1.17, czy plik został usunięty, czy po prostu dawał klapsy nowy? Znalazłem jednak coś, co wygląda jak odpowiedni interfejs ( queue.purge) */bin/amqp.py. Ale po próbie skorelowania zawartości pliku z dokumentacją muszę niestety przyznać, że Celery jest żałośnie nieudokumentowana, a także bardzo skomplikowana, przynajmniej oceniając kod źródłowy.
amn.
manage.pyjest skryptem zarządzania Django i manage.py celeryuruchamia seler po załadowaniu konfiguracji z ustawień Django. Nie używałem selera poza Django, ale dołączone celerypolecenie może być tym, czego szukasz: celery.readthedocs.org/en/latest/userguide/monitoring.html
jwhitlock
5

W Selerze 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Wyczyść nazwaną kolejkę:

 celery -A proj amqp queue.purge <queue name>

Wyczyść skonfigurowaną kolejkę

celery -A proj purge

Czyściłem wiadomości, ale nadal są wiadomości w kolejce? Odpowiedź: Zadania są potwierdzane (usuwane z kolejki), jak tylko zostaną faktycznie wykonane. Po otrzymaniu zadania pracownik może zająć trochę czasu, zanim zostanie on faktycznie wykonany, zwłaszcza jeśli na wykonanie czeka już wiele zadań. Wiadomości, które nie są potwierdzane, są zatrzymywane przez pracownika do czasu zamknięcia połączenia z brokerem (serwerem AMQP). Po zamknięciu tego połączenia (np. Z powodu zatrzymania pracownika) zadania zostaną ponownie wysłane przez brokera do następnego dostępnego pracownika (lub tego samego pracownika po ponownym uruchomieniu), aby poprawnie wyczyścić kolejkę zadań oczekujących muszą zatrzymać wszystkich pracowników, a następnie wyczyścić zadania za pomocą celery.control.purge ().

Aby oczyścić całą kolejkę, należy zatrzymać pracowników.

oneklc
źródło
5

Jeśli chcesz usunąć wszystkie oczekujące zadania, a także aktywne i zastrzeżone, aby całkowicie zatrzymać Selera, to działało dla mnie:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
Kahlo
źródło
2

1. Aby poprawnie wyczyścić kolejkę zadań oczekujących, musisz zatrzymać wszystkich pracowników ( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are- nadal-wiadomości-pozostawione w kolejce ):

$ sudo rabbitmqctl stop

lub (w przypadku gdy RabbitMQ / broker wiadomości jest zarządzany przez Inspektora):

$ sudo supervisorctl stop all

2. ... a następnie usuń zadania z określonej kolejki:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Uruchom RabbitMQ:

$ sudo rabbitmqctl start

lub (w przypadku gdy RabbitMQ jest zarządzany przez Inspektora):

$ sudo supervisorctl start all
Ukr
źródło
2

komenda celery 4+ seler purge, aby wyczyścić wszystkie skonfigurowane kolejki zadań

celery -A *APPNAME* purge

programowo:

from proj.celery import app
app.control.purge()

wszystkie oczekujące zadania zostaną usunięte. Referencja: seler naciowy

Roshan Bagdiya
źródło