System zdarzeń w Pythonie

196

Jakiego systemu zdarzeń dla Pythona używasz? Wiem już o pydispatcherze , ale zastanawiałem się, co jeszcze można znaleźć lub czy jest powszechnie używany?

Nie interesują mnie menedżery zdarzeń, które są częścią dużych frameworków, wolałbym raczej małe, gotowe rozwiązanie, które mogę łatwo rozszerzyć.

Josip
źródło

Odpowiedzi:

179

Pakiety PyPI

Od czerwca 2020 r. Są to pakiety związane ze zdarzeniami dostępne w PyPI, uporządkowane według ostatniej daty wydania.

Jest więcej

Jest wiele bibliotek do wyboru, przy użyciu bardzo różnych terminów (zdarzenia, sygnały, procedury obsługi, wysyłanie metod, przechwytywanie, ...).

Staram się zachować przegląd powyższych pakietów oraz techniki wymienione w odpowiedziach tutaj.

Po pierwsze, trochę terminologii ...

Wzór obserwatora

Najbardziej podstawowym stylem systemu zdarzeń jest „torba metod obsługi”, która jest prostą implementacją wzorca Observer .

Zasadniczo metody modułu obsługi (wywołania) są przechowywane w tablicy i każda z nich jest wywoływana, gdy zdarzenie „odpala”.

Publikuj-Subskrybuj

Wadą systemów zdarzeń Observer jest to, że można zarejestrować procedury obsługi tylko w rzeczywistym obiekcie Event (lub liście procedur obsługi). W momencie rejestracji wydarzenie musi już istnieć.

Dlatego istnieje drugi styl systemów zdarzeń: wzorzec publikowania-subskrybowania . W tym przypadku procedury obsługi nie rejestrują się w obiekcie zdarzenia (lub liście procedur obsługi), ale w centralnym programie dyspozytorskim. Powiadamiający rozmawiają tylko z dyspozytorem. O czym nasłuchiwać, a co publikować, decyduje „sygnał”, który jest niczym innym jak nazwą (ciągiem).

Wzór mediatora

Może również być interesujące: wzór Mediatora .

Haki

System „hook” jest zwykle używany w kontekście wtyczek aplikacji. Aplikacja zawiera stałe punkty integracji (zaczepy), a każda wtyczka może połączyć się z tym zaczepem i wykonać określone czynności.

Inne wydarzenia'

Uwaga: Threading.Event nie jest „systemem zdarzeń” w powyższym znaczeniu. Jest to system synchronizacji wątków, w którym jeden wątek czeka, aż inny wątek „zasygnalizuje” obiekt Event.

Biblioteki wiadomości sieciowych często używają również terminu „zdarzenia”; czasami są one podobne pod względem koncepcji; czasami nie. Mogą oczywiście przekraczać granice nici, procesu i komputera. Patrz np. Pyzmq , pymq , Twisted , Tornado , gevent , eventlet .

Słabe referencje

W Pythonie trzymanie odwołania do metody lub obiektu zapewnia, że ​​nie zostanie ono usunięte przez moduł odśmiecający. Może to być pożądane, ale może również prowadzić do wycieków pamięci: powiązane programy obsługi nigdy nie są usuwane.

Niektóre systemy zdarzeń używają słabych referencji zamiast zwykłych, aby rozwiązać ten problem.

Kilka słów o różnych bibliotekach

Systemy wydarzeń w stylu obserwatora:

  • zope.event pokazuje gołe kości tego, jak to działa (patrz odpowiedź Lennarta ). Uwaga: ten przykład nie obsługuje nawet argumentów modułu obsługi.
  • Implementacja „listy wywołań” LongPoke pokazuje, że taki system zdarzeń można wdrożyć bardzo minimalistycznie przez podklasowanie list.
  • Odmiana Felk EventHook zapewnia również podpisy osób biorących udział i dzwoniących.
  • EventHook Spassiga (wzorzec zdarzenia Michaela Foorda ) to prosta implementacja.
  • Klasa Eventu Cennych Lekcji Josipa jest zasadniczo taka sama, ale używa setzamiast zamiast a listdo przechowywania torby i narzędzi, __call__które są rozsądnymi dodatkami.
  • PyNotify ma podobną koncepcję, a także zapewnia dodatkowe koncepcje zmiennych i warunków („zmienne zdarzenie zmienione”). Strona główna nie działa.
  • axel to w zasadzie torba z uchwytami z wieloma funkcjami związanymi z gwintowaniem, obsługą błędów, ...
  • Python-dispatch wymaga wyprowadzenia parzystych klas źródłowych pydispatch.Dispatcher.
  • buslane jest oparty na klasach, obsługuje pojedyncze lub wielokrotne moduły obsługi i ułatwia obszerne wskazówki dotyczące typów.
  • Pithikos ' Observer / Event to lekka konstrukcja.

Publikuj i subskrybuj biblioteki:

  • migacz ma kilka fajnych funkcji, takich jak automatyczne rozłączanie i filtrowanie w oparciu o nadawcę.
  • PyPubSub jest stabilnym pakietem i obiecuje „zaawansowane funkcje ułatwiające debugowanie i zarządzanie tematami i wiadomościami”.
  • pymitter jest portem Python w Node.js EventEmitter2 i oferuje przestrzenie nazw, symbole wieloznaczne i TTL.
  • PyDispatcher wydaje się kłaść nacisk na elastyczność w odniesieniu do publikacji „wiele do wielu” itp. Obsługuje słabe referencje.
  • Louie jest przerobionym PyDispatcherem i powinien działać „w szerokim zakresie kontekstów”.
  • pypydispatcher jest oparty na (zgadłeś ...) PyDispatcher, a także działa w PyPy.
  • django.dispatch to przerobiony PyDispatcher „z bardziej ograniczonym interfejsem, ale o wyższej wydajności”.
  • pyeventdispatcher opiera się na narzędziu do wysyłania zdarzeń frameworku Symfony w PHP
  • dispatcher został wyodrębniony z django.dispatch, ale robi się już dość stary.
  • EventManger Cristiana Garcii to naprawdę krótkie wdrożenie.

Inne:

  • pluggy zawiera system haka używany przez pytestwtyczki.
  • RxPy3 implementuje wzorzec obserwowalny i umożliwia scalanie zdarzeń, ponawianie itp.
  • Sygnały i gniazda Qt są dostępne w PyQt lub PySide2 . Działają one jako wywołanie zwrotne, gdy są używane w tym samym wątku lub jako zdarzenia (przy użyciu pętli zdarzeń) między dwoma różnymi wątkami. Sygnały i gniazda mają takie ograniczenie, że działają tylko w obiektach klas, które się z nich wywodzą QObject.
florisla
źródło
2
Jest też Louie, oparty na PyDispatcher: pypi.python.org/pypi/Louie/1.1
the979kid
@ the979kid louie wydaje się być źle utrzymany, strona pypi prowadzi do 404 na GitHub: 11craft.github.io/louie ; github.com/gldnspud/louie . Powinien być github.com/11craft/louie .
florisla
1
słabe odbiorniki zdarzeń są powszechną potrzebą. W przeciwnym razie korzystanie ze świata rzeczywistego staje się uciążliwe. Uwaga, które rozwiązania mogą być przydatne.
kxr 11.04.17
Pypubsub 4 jest wiele do wielu i ma potężne narzędzia do debugowania wiadomości oraz kilka sposobów ograniczania ładunku wiadomości, abyś wiedział wcześniej, kiedy wysłałeś nieprawidłowe dane lub brakujące dane. PyPubSub 4 obsługuje Python 3 (a PyPubSub 3.x obsługuje Python 2).
Oliver
Niedawno opublikowałem bibliotekę o nazwie pymq github.com/thrau/pymq, która może być odpowiednia dla tej listy.
czw
99

Robiłem to w ten sposób:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

Jednak, podobnie jak w przypadku wszystkiego, co widziałem, nie ma do tego automatycznie generowanego pydoc i żadnych podpisów, co naprawdę jest do kitu.

L̲̳o̲̳̳n̲̳̳g̲̳̳p̲̳o̲̳̳k̲̳̳e̲̳̳
źródło
3
Uważam ten styl za dość intrygujący. To słodko gołe kości. Podoba mi się fakt, że pozwala on manipulować zdarzeniami i ich subskrybentami jako autonomiczne operacje. Zobaczę, jak sobie radzi w prawdziwym projekcie.
Rudy Lattae,
2
Bardzo piękny minimalistyczny styl! Wspaniały!
akaRem
2
Nie mogę głosować za tym wystarczająco, jest to naprawdę proste i łatwe.
2
wielka przysługa, czy ktoś mógłby wyjaśnić to tak, jakbym miał 10 lat? Czy ta klasa jest dziedziczona przez klasę główną? Nie widzę init, więc super () nie byłby używany. Z jakiegoś powodu to mnie nie klika.
omgimdrunk
1
@omgimdrunk Prosta procedura obsługi zdarzeń uruchamiałaby jedną lub więcej wywoływanych funkcji przy każdym uruchomieniu zdarzenia. Klasa do „zarządzania” tym wymagałaby co najmniej następujących metod - dodaj i odpal. W ramach tej klasy należałoby zachować listę procedur do wykonania. Umieśćmy to w zmiennej instancji, _bag_of_handlersktóra jest listą. Metoda dodawania klasy byłaby po prostu self._bag_of_handlers.append(some_callable). Metoda ogniowa klasy przechodziłaby przez „_bag_of_handler”, przekazując dostarczone argumenty i kwargs do procedur obsługi i wykonując je kolejno.
Gabe Spradlin,
69

Używamy EventHook zgodnie z sugestią Michaela Foorda w jego Wzorze wydarzenia :

Po prostu dodaj EventHook do swoich zajęć za pomocą:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

Dodajemy funkcję usuwania wszystkich detektorów z obiektu do klasy Michaels i otrzymaliśmy:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler
spassig
źródło
Wadą korzystania z tego jest to, że musisz najpierw dodać zdarzenie, zanim zarejestrujesz się jako subskrybent. Jeśli tylko wydawcy dodają swoje wydarzenia (nie jest to konieczne, po prostu dobra praktyka), musisz zainicjować wydawców przed subskrybentami, co jest uciążliwe w dużych projektach
Jonathan
6
ostatnia metoda jest błędna, ponieważ własne procedury obsługi .__ są modyfikowane podczas iteracji. Poprawka: `self .__ handlers = [h for h in.. Handlers if h.im_self! = Obj]`
Simon Bergot
1
@Simon ma rację, ale wprowadza błąd, ponieważ możemy mieć niezwiązane funkcje w samodzielnych programach obsługi .__. Poprawka:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
Eric Marcos
20

Używam zope.event . To najbardziej nagie kości, jakie możesz sobie wyobrazić. :-) Oto pełny kod źródłowy:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Pamiętaj, że nie możesz na przykład wysyłać wiadomości między procesami. To nie jest system wiadomości, tylko system zdarzeń, nic więcej, nic więcej.

Lennart Regebro
źródło
17
pypi.python.org/pypi/zope.event ... aby uratować biedne Google przed niektórymi
pasmami
Nadal chciałbym móc wysyłać wiadomości. Używałbym systemu zdarzeń w aplikacji zbudowanej na Tkinter. Nie używam tego systemu zdarzeń, ponieważ nie obsługuje wiadomości.
Josip
Możesz wysłać cokolwiek zechcesz dzięki zope.event. Chodzi mi jednak o to, że nie jest to odpowiedni system przesyłania wiadomości, ponieważ nie można wysyłać zdarzeń / wiadomości do innych procesów lub innych komputerów. Prawdopodobnie powinieneś być bardziej szczegółowy w swoich wymaganiach.
Lennart Regebro
15

Znalazłem ten mały skrypt w Valued Lessons . Wygląda na to, że mam odpowiedni stosunek prostoty do mocy, o który mi chodzi. Peter Thatcher jest autorem następującego kodu (nie wspomniano o licencjonowaniu).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Josip
źródło
1
Używanie set () zamiast listy jest miłe, aby uniknąć podwójnej rejestracji programów obsługi. Jedną z konsekwencji jest to, że procedury obsługi nie są wywoływane w kolejności, w jakiej zostały zarejestrowane. Niekoniecznie jednak zła rzecz ...
florisla 24.04.13
1
@florisla może zamienić się na Zestaw Zamówień, jeśli jest to pożądane.
Robino
9

Oto minimalny projekt, który powinien działać dobrze. Musisz po prostu odziedziczyć Observerw klasie, a następnie użyć observe(event_name, callback_fn)nasłuchiwać określonego zdarzenia. Za każdym razem, gdy to konkretne zdarzenie zostanie uruchomione w dowolnym miejscu kodu (tj. Event('USB connected')), Odpowiednie wywołanie zwrotne będzie uruchamiane.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Przykład:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
Pithikos
źródło
Podoba mi się twój projekt, jest minimalistyczny i łatwy do zrozumienia. i byłoby to lekkie, bez konieczności importowania niektórych modułów.
Atreyagaurav
8

Stworzyłem EventManagerklasę (kod na końcu). Składnia jest następująca:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Oto przykład:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Wynik:

Wstępne pozdrowienie
Pozdrowienia Oscar
Hello Oscar

Teraz usuń pozdrowienia
Witaj Oscar

Kod EventManger:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)
Cristian Garcia
źródło
8

Być może trzeba spojrzeć na pymitter ( PyPI ). Jest to małe podejście z pojedynczym plikiem (~ 250 loc) „zapewniające przestrzenie nazw, symbole wieloznaczne i TTL”.

Oto podstawowy przykład:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Dalailirium
źródło
6

Zrobiłem odmianę minimalistycznego podejścia Longpoke, które zapewnia również podpisy zarówno dla osób odbierających, jak i dzwoniących:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()
Felk
źródło
3

Jeśli piszę kod w pyQt, używam paradygmatu gniazd / sygnałów QT, to samo dotyczy django

Jeśli wykonuję asynchroniczne operacje we / wy, użyj natywnego modułu wyboru

Jeśli używam parsera python SAX, korzystam z interfejsu API zdarzeń dostarczonego przez SAX. Wygląda na to, że jestem ofiarą bazowego API :-)

Może powinieneś zadać sobie pytanie, czego oczekujesz od frameworku / modułu eventu. Osobiście wolę używać paradygmatu Socket / Signal firmy QT. więcej informacji na ten temat można znaleźć tutaj

SashaN
źródło
2

Oto kolejny moduł do rozważenia. Wydaje się realnym wyborem dla bardziej wymagających aplikacji.

Py-powiadomienie to pakiet Pythona zapewniający narzędzia do implementacji wzorca programowania Observer. Narzędzia te obejmują sygnały, warunki i zmienne.

Sygnały to listy procedur obsługi, które są wywoływane w momencie emitowania sygnału. Warunki są w zasadzie zmiennymi logicznymi połączonymi z sygnałem, który jest emitowany, gdy zmienia się stan warunków. Można je łączyć za pomocą standardowych operatorów logicznych (nie itd.) W warunki złożone. Zmienne, w przeciwieństwie do warunków, mogą zawierać dowolny obiekt Pythona, nie tylko booleany, ale nie można ich łączyć.

Josip
źródło
1
Strona główna jest poza tym prowizją, być może nie jest już obsługiwana?
David Parks
1

Jeśli chcesz robić bardziej skomplikowane rzeczy, takie jak scalanie zdarzeń lub ponawiać próbę, możesz użyć Wzorca Observable i dojrzałej biblioteki, która to implementuje. https://github.com/ReactiveX/RxPY . Obserwowalne są bardzo powszechne w Javascript i Java i bardzo wygodne w użyciu do niektórych zadań asynchronicznych.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

WYDAJNOŚĆ :

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
David Dehghan
źródło
1

Jeśli potrzebujesz magistrali zdarzeń, która działa poza granicami procesu lub sieci, możesz wypróbować PyMQ . Obecnie obsługuje pub / sub, kolejki komunikatów i synchroniczne RPC. Domyślna wersja działa na bazie backendu Redis, więc potrzebujesz działającego serwera Redis. Istnieje również backend w pamięci do testowania. Możesz także napisać własny backend.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

Aby zainicjować system:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Oświadczenie: Jestem autorem tej biblioteki

thrau
źródło
0

Możesz spróbować buslane moduł.

Ta biblioteka ułatwia implementację systemu opartego na komunikatach. Obsługuje podejście poleceń (pojedynczy moduł obsługi) i zdarzeń (0 lub wiele modułów obsługi). Buslane używa adnotacji typu Python, aby poprawnie zarejestrować moduł obsługi.

Prosty przykład:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='[email protected]',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='[email protected]',
    password='secret',
))

Aby zainstalować buslane, po prostu użyj pip:

$ pip install buslane
Konrad Hałas
źródło
0

Jakiś czas temu napisałem bibliotekę, która może ci się przydać. Pozwala mieć lokalnych i globalnych słuchaczy, wiele różnych sposobów ich rejestrowania, priorytet wykonania i tak dalej.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Spójrz pyeventdispatcher

Daniel Ancuta
źródło