Próbuję wymyślić metodologię testowania dla naszego projektu django-seler . Przeczytałem notatki w dokumentacji , ale nie dały mi one dobrego pojęcia, co właściwie mam robić. Nie martwię się testowaniem zadań w rzeczywistych demonach, tylko funkcjonalność mojego kodu. Głównie się zastanawiam:
- Jak możemy ominąć
task.delay()
podczas testu (próbowałem ustawić,CELERY_ALWAYS_EAGER = True
ale to bez różnicy)? - W jaki sposób używamy zalecanych ustawień testowych (jeśli to najlepszy sposób) bez faktycznej zmiany naszego settings.py?
- Can we still use
manage.py test
or do we have to use a custom runner?
Overall any hints or tips for testing with celery would be very helpful.
python
django
unit-testing
celery
Jason Webb
źródło
źródło
CELERY_ALWAYS_EAGER
nie robi różnicy.delay
próba nawiązania połączenia.BROKER_BACKEND=memory
W takim przypadku ustawienie może pomóc.BROKER_BACKEND=memory
naprawione. Jeśli podasz to jako odpowiedź, zaznaczę to jako poprawne.Odpowiedzi:
Spróbuj ustawić:
BROKER_BACKEND = 'memory'
(Dzięki komentarzowi asksol .)
źródło
Lubię używać dekoratora override_settings w testach, które wymagają wyników selera.
from django.test import TestCase from django.test.utils import override_settings from myapp.tasks import mytask class AddTestCase(TestCase): @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory') def test_mytask(self): result = mytask.delay() self.assertTrue(result.successful())
Jeśli chcesz zastosować to do wszystkich testów, możesz użyć narzędzia do uruchamiania testów selera, jak opisano na http://docs.celeryproject.org/en/2.5/django/unit-testing.html, który zasadniczo określa te same ustawienia, z wyjątkiem (
BROKER_BACKEND = 'memory'
).W ustawieniach:
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
Spójrz na źródło CeleryTestSuiteRunner i jest całkiem jasne, co się dzieje.
źródło
djcelery
.Oto fragment mojej testowej klasy bazowej, która odcina
apply_async
metodę i zapisuje wywołania do niej (co obejmujeTask.delay
.) Jest trochę obrzydliwy, ale udało mi się dopasować do moich potrzeb w ciągu ostatnich kilku miesięcy, kiedy z niej korzystałem.from django.test import TestCase from celery.task.base import Task # For recent versions, Task has been moved to celery.task.app: # from celery.app.task import Task # See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html class CeleryTestCaseBase(TestCase): def setUp(self): super(CeleryTestCaseBase, self).setUp() self.applied_tasks = [] self.task_apply_async_orig = Task.apply_async @classmethod def new_apply_async(task_class, args=None, kwargs=None, **options): self.handle_apply_async(task_class, args, kwargs, **options) # monkey patch the regular apply_sync with our method Task.apply_async = new_apply_async def tearDown(self): super(CeleryTestCaseBase, self).tearDown() # Reset the monkey patch to the original method Task.apply_async = self.task_apply_async_orig def handle_apply_async(self, task_class, args=None, kwargs=None, **options): self.applied_tasks.append((task_class, tuple(args), kwargs)) def assert_task_sent(self, task_class, *args, **kwargs): was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2] for task in self.applied_tasks) self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args)) def assert_task_not_sent(self, task_class): was_sent = any(task_class == task[0] for task in self.applied_tasks) self.assertFalse(was_sent, 'Task was not expected to be called, but was. Applied tasks: %s' % self.applied_tasks)
Oto przykład „z góry głowy”, jak można go użyć w przypadkach testowych:
mymodule.py
from my_tasks import SomeTask def run_some_task(should_run): if should_run: SomeTask.delay(1, some_kwarg=2)
test_mymodule.py
class RunSomeTaskTest(CeleryTestCaseBase): def test_should_run(self): run_some_task(should_run=True) self.assert_task_sent(SomeTask, 1, some_kwarg=2) def test_should_not_run(self): run_some_task(should_run=False) self.assert_task_not_sent(SomeTask)
źródło
ponieważ nadal widzę ten komunikat w wynikach wyszukiwania, zastąpienie ustawień za pomocą
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
pracował dla mnie zgodnie z Celery Docs
źródło
Dla wszystkich, którzy przybędą tu w 2019 roku: zapoznaj się z tym artykułem obejmującym różne strategie, w tym synchroniczne wywoływanie zadań.
źródło
To właśnie zrobiłem
Wewnątrz myapp.tasks.py mam:
from celery import shared_task @shared_task() def add(a, b): return a + b
Wewnątrz myapp.test_tasks.py mam:
from django.test import TestCase, override_settings from myapp.tasks import add class TasksTestCase(TestCase): def setUp(self): ... @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True) def test_create_sections(self): result= add.delay(1,2) assert result.successful() == True assert result.get() == 3
źródło