Jak przetestować zadanie z selerem?

Odpowiedzi:

61

Możliwe jest synchroniczne testowanie zadań przy użyciu dowolnej biblioteki unittest. Zwykle wykonuję 2 różne sesje testowe podczas pracy z zadaniami z selera. Pierwszy (jak sugeruję poniżej) jest całkowicie synchroniczny i powinien być tym, który zapewnia, że ​​algorytm robi to, co powinien. Druga sesja wykorzystuje cały system (w tym brokera) i upewnia się, że nie mam problemów z serializacją lub inną dystrybucją, problemem z komunikacją.

Więc:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

I twój test:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Mam nadzieję, że to pomoże!

FlaPer87
źródło
1
Działa to z wyjątkiem zadań, które używają HttpDispatchTask - docs.celeryproject.org/en/latest/userguide/remote-tasks.html, gdzie muszę ustawić celery.conf.CELERY_ALWAYS_EAGER = True, ale nawet z ustawieniem celery.conf.CELERY_IMPORTS = ('selery.task.http') test kończy się niepowodzeniem z NotRegistered: selery.task.http.HttpDispatchTask
davidmytton
Dziwne, czy na pewno nie masz problemów z importowaniem? Ten test działa (zwróć uwagę, że fałszywa odpowiedź, więc zwraca to, czego oczekuje seler). Również moduły zdefiniowane w CELERY_IMPORTS zostaną zaimportowane podczas inicjalizacji pracowników , aby tego uniknąć sugeruję zadzwonić celery.loader.import_default_modules().
FlaPer87
Proponuję również zajrzeć tutaj . Kpi z żądania http. Nie wiem, czy to pomaga. Myślę, że chcesz przetestować działającą usługę, prawda?
FlaPer87
52

Używam tego:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Dokumenty: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER umożliwia synchroniczne uruchamianie zadań i nie potrzebujesz serwera selera.

guettli
źródło
1
Myślę, że to jest nieaktualne - rozumiem ImportError: No module named celeryconfig.
Daenyth
7
Uważam, że powyżej zakłada się, że moduł celeryconfig.pyistnieje w jednym pakiecie. Zobacz docs.celeryproject.org/en/latest/getting-started/… .
Kamil Sindi
1
Wiem, że jest stary, ale czy możesz podać pełny przykład, jak uruchamiać zadania addz pytania OP w TestCaseklasie?
Kruupös
@ MaxChrétien przepraszam, nie mogę podać pełnego przykładu, ponieważ nie używam już selera. Możesz edytować moje pytanie, jeśli masz wystarczająco dużo punktów reputacji. Jeśli nie masz wystarczającej ilości, daj mi znać, co mam skopiować i wkleić do tej odpowiedzi.
guettli
1
@ miken32 thanks. Ponieważ najnowsza odpowiedź w jakiś sposób rozwiązuje problem, z którym chciałem pomóc, zostawiłem tylko komentarz, którego oficjalna dokumentacja 4.0 odradza używanie CELERY_TASK_ALWAYS_EAGERdo testów jednostkowych.
krassowski
33

Zależy od tego, co dokładnie chcesz testować.

  • Przetestuj kod zadania bezpośrednio. Nie wywołuj „task.delay (...)” po prostu wywołaj „task (...)” z testów jednostkowych.
  • Użyj CELERY_ALWAYS_EAGER . Spowoduje to, że Twoje zadania będą wywoływane natychmiast w momencie, gdy powiesz „task.delay (...)”, więc możesz przetestować całą ścieżkę (ale nie asynchroniczne zachowanie).
slacy
źródło
24

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test armatura

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Dodatek: spraw, by send_task cieszył się szacunkiem

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Kamil Sindi
źródło
23

Dla tych na Seler 4 to:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Ponieważ nazwy ustawień zostały zmienione i wymagają aktualizacji, jeśli zdecydujesz się na aktualizację, zobacz

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

okrutny
źródło
11
Zgodnie z oficjalną dokumentacją , użycie „task_always_eager” (wcześniej „CELERY_ALWAYS_EAGER”) nie jest odpowiednie do testów jednostkowych. Zamiast tego proponują inne, świetne sposoby testowania jednostkowego aplikacji Seler.
krassowski
4
Dodam tylko, że powodem, dla którego nie chcesz chętnych zadań w testach jednostkowych, jest to, że wtedy nie testujesz np. Serializacji parametrów, która nastąpi, gdy użyjesz kodu w produkcji.
cholera
15

Od wersji Celery 3.0 jednym ze sposobów ustawienia CELERY_ALWAYS_EAGERw Django jest:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Aaron Lelevier
źródło
7

Od wersji 4.0 Selera, urządzenia py.test są dostarczane do uruchomienia pracownika selera tylko na czas testu i są wyłączane po zakończeniu:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

Wśród innych urządzeń opisanych na http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test możesz zmienić domyślne opcje selera, ponownie definiując celery_configurządzenie w ten sposób:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

Domyślnie proces roboczy testu używa brokera w pamięci i zaplecza wyników. Nie ma potrzeby korzystania z lokalnego Redis lub RabbitMQ, jeśli nie testujesz określonych funkcji.

alanjds
źródło
Drogi przeciwniku, czy chciałbyś się podzielić, dlaczego jest to zła odpowiedź? Serdecznie dziękuję.
alanjds
2
Nie działał dla mnie, pakiet testowy po prostu się zawiesza. Czy mógłbyś podać więcej kontekstu? (Jednak jeszcze nie głosowałem;)).
dwoistość
W moim przypadku musiałem jawnie ustawić urządzenie celey_config na korzystanie z brokera pamięci i pamięci podręcznej + zaplecza pamięci
sanzoghenzo
5

odniesienie using pytest.

def test_add(celery_worker):
    mytask.delay()

jeśli używasz flask, ustaw plik config

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

i w conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Yoge
źródło
2

W moim przypadku (i zakładam wiele innych) chciałem tylko przetestować wewnętrzną logikę zadania za pomocą pytest.

TL; DR; skończyło się wyszydzaniem wszystkiego ( OPCJA 2 )


Przykład zastosowania :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

ale ponieważ shared_taskdekorator ma dużo wewnętrznej logiki selera, tak naprawdę nie jest to test jednostkowy.

Tak więc dla mnie były 2 opcje:

OPCJA 1: Oddzielna logika wewnętrzna

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Wygląda to bardzo dziwnie i poza tym, że jest mniej czytelny, wymaga ręcznego wyodrębnienia i przekazania atrybutów, które są częścią żądania, na przykład task_id w razie potrzeby, co sprawia, że ​​logika jest mniej czysta.

OPCJA 2: kpi z
wyszydzania wnętrzności selera

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

co następnie pozwala mi na mockowanie obiektu żądania (ponownie, jeśli potrzebujesz rzeczy z żądania, takich jak identyfikator lub licznik ponownych prób.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

To rozwiązanie jest znacznie bardziej ręczne, ale daje mi kontrolę, której potrzebuję, aby wykonać testy jednostkowe , bez powtarzania się i bez utraty celownika.

Daniel Dubovski
źródło