from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
Próbowałem, ale to naprawdę powolne (jak 1 sekunda). Używam go synchronicznie w aplikacji tornado do monitorowania postępów, więc musi być szybki.
JulienFr
41
Nie zwróci to listy zadań w kolejce, które nie zostały jeszcze przetworzone.
Ed J
9
Użyj, i.reserved()aby uzyskać listę zadań w kolejce.
Banan
4
Czy ktoś doświadczył, że i.reserved () nie ma dokładnej listy aktywnych zadań? Mam uruchomione zadania, których nie ma na liście. Jestem na django-seler == 3.1.10
Seperman
6
Przy określaniu pracownika musiałem użyć listy jako argument: inspect(['celery@Flatty']). Ogromna poprawa szybkości inspect().
Adversus
42
jeśli używasz rabbitMQ, użyj tego w terminalu:
sudo rabbitmqctl list_queues
wypisze listę kolejek z liczbą oczekujących zadań. na przykład:
Znam się na tym, gdy mam uprawnienia sudo, ale chcę, aby nieuprzywilejowany użytkownik systemu mógł to sprawdzić - jakieś sugestie?
sage
Ponadto możesz to przepuścić, grep -e "^celery\s" | cut -f2aby wyodrębnić, że 166jeśli chcesz przetworzyć tę liczbę później, powiedzmy dla statystyk.
jamesc
21
Jeśli nie używasz zadań z priorytetami, jest to całkiem proste, jeśli używasz Redis. Aby otrzymać zadanie liczy się:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Ale zadania z priorytetami używają innego klucza w redis , więc pełny obraz jest nieco bardziej skomplikowany. Pełny obraz jest taki, że należy zapytać redis o każdy priorytet zadania. W Pythonie (i z projektu Flower) wygląda to tak:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Jeśli chcesz otrzymać aktualne zadanie, możesz użyć czegoś takiego:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
Stamtąd będziesz musiał deserializować zwróconą listę. W moim przypadku udało mi się to osiągnąć za pomocą czegoś takiego:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Ostrzegam tylko, że deserializacja może chwilę potrwać i będziesz musiał dostosować powyższe polecenia, aby działały z różnymi priorytetami.
Jeśli używasz Celery + Django, najprostszy sposób sprawdzania zadań za pomocą poleceń bezpośrednio z terminala w środowisku wirtualnym lub przy użyciu pełnej ścieżki do selera:
Jeśli masz zdefiniowany projekt, możesz użyćcelery -A my_proj inspect reserved
sashaboulouds
6
Rozwiązanie typu kopiuj-wklej dla Redis z serializacją json:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Działa z Django. Po prostu nie zapomnij się zmienić yourproject.celery.
Jeśli używasz serializatora marynat, możesz zmienić body =linię na body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker
4
Wydaje się, że moduł inspekcji selera jest świadomy zadań tylko z perspektywy pracowników. Jeśli chcesz zobaczyć wiadomości, które są w kolejce (jeszcze nie zostały wyciągnięte przez pracowników), sugeruję użycie pyrabbita , który może współpracować z interfejsem http api rabbitmq w celu pobrania wszelkiego rodzaju informacji z kolejki.
Myślę, że jedynym sposobem na uzyskanie zadań, które oczekują, jest zachowanie listy rozpoczętych zadań i pozwolenie zadaniu na usunięcie się z listy po uruchomieniu.
w moim przypadku jobszawsze wynosi zero ... jakiś pomysł?
daveoncode
@daveoncode Nie sądzę, że to wystarczająca ilość informacji, abym mógł udzielić pomocy. Możesz otworzyć własne pytanie. Nie sądzę, aby był to duplikat tego, jeśli określisz, że chcesz pobrać informacje w Pythonie. Wróciłbym do stackoverflow.com/a/19465670/9843399 , na którym oparłem moją odpowiedź, i upewniłem się, że zadziała w pierwszej kolejności.
Caleb Syring
@CalebSyring To jest pierwsze podejście, które naprawdę pokazuje mi zadania w kolejce. Bardzo dobrze. Jedynym problemem dla mnie jest to, że dołączanie listy nie wydaje się działać. Jakieś pomysły, jak mogę sprawić, by funkcja wywołania zwrotnego zapisywała się na liście?
Varlor,
@Varlor Przykro mi, ktoś dokonał niewłaściwej edycji mojej odpowiedzi. Możesz przejrzeć historię zmian w poszukiwaniu oryginalnej odpowiedzi, która najprawdopodobniej będzie dla Ciebie odpowiednia. Pracuję nad rozwiązaniem tego problemu. (EDYCJA: Właśnie wszedłem i odrzuciłem edycję, która miała oczywisty błąd Pythona. Daj mi znać, czy to rozwiązało problem, czy nie.)
Caleb Syring
@CalebSyring Użyłem teraz twojego kodu w klasie, mając listę jako atrybut klasy!
Varlor
2
O ile wiem, Celery nie udostępnia API do sprawdzania zadań, które czekają w kolejce. To jest specyficzne dla brokera. Jeśli na przykład używasz Redis jako brokera, zbadanie zadań oczekujących w celery(domyślnej) kolejce jest tak proste, jak:
połączyć się z bazą danych brokera
lista elementów na celeryliście (na przykład polecenie LRANGE)
Należy pamiętać, że są to zadania CZEKAJĄCE na wybranie przez dostępnych pracowników. W Twoim klastrze mogą być uruchomione pewne zadania - nie będzie ich na tej liście, ponieważ zostały już wybrane.
Doszedłem do wniosku, że najlepszym sposobem uzyskania liczby zadań w kolejce jest użycie, rabbitmqctlco zostało tu kilkakrotnie zasugerowane. Aby umożliwić dowolnemu wybranemu użytkownikowi uruchomienie polecenia z sudo, postępowałem zgodnie z instrukcjami tutaj (pominąłem edycję części profilu, ponieważ nie mam nic przeciwko wpisywaniu sudo przed poleceniem).
Złapałem też Jamesca grepi cutsnippet i zawinąłem je w wywołania podprocesu.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Jeśli kontrolujesz kod zadań, możesz obejść problem, pozwalając zadaniu wywołać trywialną ponowną próbę przy pierwszym wykonaniu, a następnie sprawdzając inspect().reserved(). Ponowna próba rejestruje zadanie z zapleczem wyników i seler to widzi. Zadanie musi zaakceptować selflub contextjako pierwszy parametr, abyśmy mogli uzyskać dostęp do liczby ponownych prób.
To rozwiązanie jest agnostyczne dla brokera, tj. nie musisz się martwić, czy do przechowywania zadań używasz RabbitMQ czy Redis.
EDYCJA: po przetestowaniu stwierdziłem, że jest to tylko częściowe rozwiązanie. Rozmiar zarezerwowanego jest ograniczony do ustawienia pobierania wstępnego dla pracownika.
Odpowiedzi:
EDYTUJ: Zobacz inne odpowiedzi, aby uzyskać listę zadań w kolejce.
Powinieneś zajrzeć tutaj: Przewodnik po selerze - Inspekcja pracowników
Zasadniczo to:
W zależności od tego, czego chcesz
źródło
i.reserved()
aby uzyskać listę zadań w kolejce.inspect(['celery@Flatty'])
. Ogromna poprawa szybkościinspect()
.jeśli używasz rabbitMQ, użyj tego w terminalu:
wypisze listę kolejek z liczbą oczekujących zadań. na przykład:
liczba w prawej kolumnie to liczba zadań w kolejce. powyżej, kolejka selera ma 166 oczekujących zadań.
źródło
grep -e "^celery\s" | cut -f2
aby wyodrębnić, że166
jeśli chcesz przetworzyć tę liczbę później, powiedzmy dla statystyk.Jeśli nie używasz zadań z priorytetami, jest to całkiem proste, jeśli używasz Redis. Aby otrzymać zadanie liczy się:
Ale zadania z priorytetami używają innego klucza w redis , więc pełny obraz jest nieco bardziej skomplikowany. Pełny obraz jest taki, że należy zapytać redis o każdy priorytet zadania. W Pythonie (i z projektu Flower) wygląda to tak:
Jeśli chcesz otrzymać aktualne zadanie, możesz użyć czegoś takiego:
Stamtąd będziesz musiał deserializować zwróconą listę. W moim przypadku udało mi się to osiągnąć za pomocą czegoś takiego:
Ostrzegam tylko, że deserializacja może chwilę potrwać i będziesz musiał dostosować powyższe polecenia, aby działały z różnymi priorytetami.
źródło
DATABASE_NUMBER
domyślnie używane jest0
iQUEUE_NAME
jestcelery
, więcredis-cli -n 0 llen celery
zwróci liczbę wiadomości w kolejce.'{{{0}}}{1}{2}'
zamiast'{0}{1}{2}'
. Poza tym działa to doskonale!Aby pobrać zadania z zaplecza, użyj tego
źródło
Jeśli używasz Celery + Django, najprostszy sposób sprawdzania zadań za pomocą poleceń bezpośrednio z terminala w środowisku wirtualnym lub przy użyciu pełnej ścieżki do selera:
Dokument : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Również jeśli używasz Celery + RabbitMQ , możesz sprawdzić listę kolejek za pomocą następującego polecenia:
Więcej informacji : https://linux.die.net/man/1/rabbitmqctl
źródło
celery -A my_proj inspect reserved
Rozwiązanie typu kopiuj-wklej dla Redis z serializacją json:
Działa z Django. Po prostu nie zapomnij się zmienić
yourproject.celery
.źródło
body =
linię nabody = pickle.loads(base64.b64decode(j['body']))
.Wydaje się, że moduł inspekcji selera jest świadomy zadań tylko z perspektywy pracowników. Jeśli chcesz zobaczyć wiadomości, które są w kolejce (jeszcze nie zostały wyciągnięte przez pracowników), sugeruję użycie pyrabbita , który może współpracować z interfejsem http api rabbitmq w celu pobrania wszelkiego rodzaju informacji z kolejki.
Przykład można znaleźć tutaj: Pobierz długość kolejki za pomocą Celery (RabbitMQ, Django)
źródło
Myślę, że jedynym sposobem na uzyskanie zadań, które oczekują, jest zachowanie listy rozpoczętych zadań i pozwolenie zadaniu na usunięcie się z listy po uruchomieniu.
Z rabbitmqctl i list_queues możesz zobaczyć, ile zadań czeka, ale nie same zadania: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Jeśli to, co chcesz, zawiera przetwarzane zadanie, ale jeszcze nie zostało ukończone, możesz zachować listę swoich zadań i sprawdzić ich stany:
Możesz też pozwolić Selerowi przechowywać wyniki w CELERY_RESULT_BACKEND i sprawdzić, których zadań tam nie ma.
źródło
To zadziałało w mojej aplikacji:
active_jobs
będzie listą ciągów odpowiadających zadaniom w kolejce.Nie zapomnij wymienić CELERY_APP_INSTANCE na własną.
Dziękuję @ashish za wskazanie mi właściwego kierunku i jego odpowiedź tutaj: https://stackoverflow.com/a/19465670/9843399
źródło
jobs
zawsze wynosi zero ... jakiś pomysł?O ile wiem, Celery nie udostępnia API do sprawdzania zadań, które czekają w kolejce. To jest specyficzne dla brokera. Jeśli na przykład używasz Redis jako brokera, zbadanie zadań oczekujących w
celery
(domyślnej) kolejce jest tak proste, jak:celery
liście (na przykład polecenie LRANGE)Należy pamiętać, że są to zadania CZEKAJĄCE na wybranie przez dostępnych pracowników. W Twoim klastrze mogą być uruchomione pewne zadania - nie będzie ich na tej liście, ponieważ zostały już wybrane.
źródło
Doszedłem do wniosku, że najlepszym sposobem uzyskania liczby zadań w kolejce jest użycie,
rabbitmqctl
co zostało tu kilkakrotnie zasugerowane. Aby umożliwić dowolnemu wybranemu użytkownikowi uruchomienie polecenia zsudo
, postępowałem zgodnie z instrukcjami tutaj (pominąłem edycję części profilu, ponieważ nie mam nic przeciwko wpisywaniu sudo przed poleceniem).Złapałem też Jamesca
grep
icut
snippet i zawinąłem je w wywołania podprocesu.źródło
źródło
Jeśli kontrolujesz kod zadań, możesz obejść problem, pozwalając zadaniu wywołać trywialną ponowną próbę przy pierwszym wykonaniu, a następnie sprawdzając
inspect().reserved()
. Ponowna próba rejestruje zadanie z zapleczem wyników i seler to widzi. Zadanie musi zaakceptowaćself
lubcontext
jako pierwszy parametr, abyśmy mogli uzyskać dostęp do liczby ponownych prób.To rozwiązanie jest agnostyczne dla brokera, tj. nie musisz się martwić, czy do przechowywania zadań używasz RabbitMQ czy Redis.
EDYCJA: po przetestowaniu stwierdziłem, że jest to tylko częściowe rozwiązanie. Rozmiar zarezerwowanego jest ograniczony do ustawienia pobierania wstępnego dla pracownika.
źródło
Z
subprocess.run
:Należy uważać, aby zmienić
my_proj
zyour_proj
źródło