Czy IObserver .NET <T> był przeznaczony do subskrybowania wielu IObservable?

9

Istnieją interfejsy IObservable i IObserver w .NET (także tu i tutaj ). Co ciekawe, konkretne wdrożenie IObserver nie zawiera bezpośredniego odniesienia do IObservable. Nie wie, kto jest subskrybentem. Może tylko wywołać subskrybenta. „Wyciągnij pinezkę, aby anulować subskrypcję”.

edycja: Subskrybent implementuje IDisposable. Myślę, że ten schemat został zastosowany, aby zapobiec problemowi z wygasłym słuchaczem .

Jednak dwie rzeczy nie są do końca jasne.

  1. Czy wewnętrzna klasa Unsubscriber zapewnia zachowanie „zapisz się i zapomnij”? Kto (i kiedy dokładnie) wzywa IDisposable.Dispose()abonenta? Garbage collector (GC) nie jest deterministyczny.
    [Uwaga: ogólnie, spędziłem więcej czasu z C i C ++ niż z C #.]
  2. Co powinno się stać, jeśli chcę subskrybować obserwatora K do obserwowalnego L1, a obserwator jest już subskrybowany do innego obserwowalnego L2?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    Kiedy uruchomiłem ten kod testowy na przykładzie MSDN, obserwator pozostał subskrybowany do L1. Byłoby to szczególne w prawdziwym rozwoju. Potencjalnie istnieją 3 sposoby na poprawę tego:

    • Jeśli obserwator ma już instancję subskrybenta (tj. Jest już subskrybowany), wówczas cicho wypisuje się z pierwotnego dostawcy przed subskrybowaniem nowego. Takie podejście ukrywa fakt, że nie jest już subskrybentem pierwotnego dostawcy, co może później stać się niespodzianką.
    • Jeśli obserwator ma już instancję subskrybenta, zgłasza wyjątek. Dobrze zachowujący się kod wywołujący musi jawnie wypisać się z listy obserwatorów.
    • Obserwator subskrybuje wielu dostawców. Jest to najbardziej intrygująca opcja, ale czy można to zaimplementować za pomocą IObservable i IObserver? Zobaczmy. Obserwator może przechowywać listę obiektów niesubskrybujących: po jednym dla każdego źródła. Niestety IObserver.OnComplete()nie udostępnia referencji dostawcy, który ją wysłał. Tak więc implementacja IObserver z wieloma dostawcami nie byłaby w stanie określić, z którego z nich zrezygnować.
  3. Czy IObserver .NET był przeznaczony do subskrybowania wielu IObservables?
    Czy podręcznikowa definicja wzorca obserwatora wymaga, aby jeden obserwator mógł subskrybować wielu dostawców? Czy jest to opcjonalne i zależy od implementacji?

Nick Alexeev
źródło

Odpowiedzi:

5

Dwa interfejsy są w rzeczywistości częścią Reactive Extensions (w skrócie Rx), powinieneś korzystać z tej biblioteki prawie zawsze, gdy chcesz z nich korzystać.

Interfejsy są technicznie w mscrolib, a nie w żadnym z zespołów Rx. Myślę, że ma to na celu ułatwienie interoperacyjności: w ten sposób biblioteki takie jak TPL Dataflow mogą zapewniać członków, którzy pracują z tymi interfejsami , bez faktycznego odwoływania się do Rx.

Jeśli użyjesz Rx Subjectjako swojej implementacji IObservable, Subscribezwróci wartość, IDisposablektórej można użyć do anulowania subskrypcji:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
svick
źródło
5

Aby wyjaśnić kilka rzeczy, które są dobrze udokumentowane w oficjalnych Wytycznych Projektowych Rx i szczegółowo na mojej stronie IntroToRx.com :

  • Nie polegasz na GC, aby oczyścić swoje subskrypcje. Szczegółowo omówione tutaj
  • Nie ma Unsubscribemetody. Subskrybujesz obserwowalną sekwencję i otrzymujesz subskrypcję . Następnie możesz pozbyć się tej subskrypcji, wskazując, że nie chcesz już wywoływać wywołań zwrotnych.
  • Obserwowalnej sekwencji nie można ukończyć więcej niż jeden raz (patrz sekcja 4 Wytycznych projektowych Rx).
  • Istnieje wiele sposobów konsumowania wielu możliwych do zaobserwowania sekwencji. Istnieje również wiele informacji na ten temat w Reactivex.io i ponownie w IntroToRx.

Aby być konkretnym i bezpośrednio odpowiedzieć na pierwotne pytanie, twoje użycie jest od początku do końca. Nie wpychasz wielu obserwowalnych sekwencji w jednego obserwatora. Komponujesz obserwowalne sekwencje w pojedynczą obserwowalną sekwencję. Następnie subskrybujesz tę pojedynczą sekwencję.

Zamiast

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

Który jest tylko pseudo kodem i nie działałby w implementacji Rx .NET, powinieneś wykonać następujące czynności:

var source1 = new Subject<int>(); //was L1
var source2 = new Subject<int>(); //was L2

var subscription = source1
    .Merge(source2)
    .Subscribe(value=>Console.WriteLine("OnNext({0})", value));


source1.OnNext(1003);
source2.OnNext(1004);

subscription.Dispose();

Teraz to nie pasuje do początkowego pytania, ale nie wiem, co K.Unsubscribe()powinno być zrobione (zrezygnować ze wszystkich, ostatniej czy pierwszej subskrypcji ?!)

Lee Campbell
źródło
Czy mogę po prostu zamknąć obiekt subskrypcji w bloku „używającym”?
Robert Oschler
1
W tym przypadku synchronicznym możesz, jednak Rx ma być asynchroniczny. W przypadku asynchronicznym zwykle nie można użyć usingbloku. Koszt wyciągu z subskrypcji powinien być praktycznie zerowy, więc wyzeruj blok używający, subskrybuj, pozostaw blok blokujący (a tym samym zrezygnuj z subskrypcji), czyniąc kod raczej bezcelowym
Lee Campbell
3

Masz rację. Przykład działa źle dla wielu IObservables.

Myślę, że OnComplete () nie zapewnia referencji, ponieważ nie chcą, aby IObservable musiał ją utrzymywać. Gdybym pisał, że prawdopodobnie wspierałbym wiele subskrypcji, każąc Subskrybować wziąć identyfikator jako drugi parametr, który jest przekazywany z powrotem do wywołania OnComplete (). Więc możesz powiedzieć

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

W tej chwili wygląda na to, że .NET IObserver nie jest odpowiedni dla wielu obserwatorów. Ale przypuszczam, że mógłby to być twój główny obiekt (w tym przykładzie LocationReporter)

public Dictionary<String,IObserver> Observers;

i to umożliwi ci wsparcie

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

także.

Podejrzewam, że Microsoft mógłby argumentować, że w związku z tym nie ma potrzeby bezpośredniego wspierania wielu IObservables w interfejsach.

psr
źródło
Myślałem również, że obserwowalna implementacja może mieć listę obserwatorów. Ja też zauważyłem, że IObserver.OnComplete()nie identyfikuje, kto pochodzi z połączenia. Jeśli obserwator jest subskrybentem więcej niż jednego obserwowalnego, nie wie, od kogo się wypisać. Rozczarowujący. Zastanawiam się, czy .NET ma lepszy interfejs dla wzorca obserwatora?
Nick Alexeev
Jeśli chcesz mieć odniesienie do czegoś, powinieneś użyć odwołania, a nie ciągu.
sick
Ta odpowiedź pomogła mi z prawdziwym błędem. Użyłem Observable.Create()do zbudowania obserwowalnego i używając do tego szeregu obserwowalnych źródeł Subscribe(). Nieumyślnie przekazałem ukończony obserwowalny w jednej ścieżce kodu. To uzupełniło moje nowo utworzone obserwowalne, mimo że inne źródła nie były kompletne. Zajęło mi wieki, aby się zorientować, co muszę zrobić - przełącznik Observable.Empty()dla Observable.Never().
Olly,
0

Wiem, że to jest sposób późno do partii, ale ...

Interfejsy I Observable<T>i nieIObserver<T> są częścią Rx ... są to typy podstawowe ... ale Rx szeroko z nich korzysta.

Możesz mieć dowolną liczbę (lub tak niewielu) obserwatorów. Jeśli spodziewasz się wielu obserwatorów, obowiązkiem obserwatora jest kierowanie OnNext()połączeń do odpowiednich obserwatorów dla każdego obserwowanego zdarzenia. Obserwowalny może wymagać listy lub słownika, jak sugerujesz.

Są dobre przypadki na dopuszczenie tylko jednego - i dobre przypadki na dopuszczenie wielu. Na przykład w implementacji CQRS / ES, można wymusić jeden obsługi poleceń za poleceń typu w autobusie poleceń, a może powiadomić kilka transformacji odczytu ubocznych dla danego zdarzenia typu w składnicy zdarzeń.

Jak stwierdzono w innych odpowiedziach, nie ma Unsubscribe. Pozbywanie się rzeczy, które dostajesz, gdy Subscribegeneralnie wykonujesz brudną robotę. Obserwator lub jego agent jest odpowiedzialny za trzymanie tokena, dopóki nie będzie już chciał otrzymywać dalszych powiadomień . (pytanie 1)

W twoim przykładzie:

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

... byłoby bardziej jak:

using ( var l1Token = K.Subscribe( L1 ) )
{
  using ( var l2Token = K.Subscribe( L2 );
  {
    L1.PublishObservation( 1003 );
    L2.PublishObservation( 1004 );
  } //--> effectively unsubscribing to L2 here

  L2.PublishObservation( 1005 );
}

... gdzie K usłyszy 1003 i 1004, ale nie 1005.

Dla mnie nadal wygląda to zabawnie, ponieważ nominalnie subskrypcje to rzeczy długowieczne ... często na czas trwania programu. Nie różnią się pod tym względem zwykłymi zdarzeniami .Net.

W wielu przykładach, które widziałem, Disposetoken działa usuwając obserwatora z listy obserwatorów. Wolę, aby token nie zawierał tyle wiedzy wokół ... i dlatego uogólniłem tokeny subskrypcji, aby wywoływały przekazaną lambda (z informacjami identyfikującymi przechwyconymi w czasie subskrypcji:

public class SubscriptionToken<T>: IDisposable
{
  private readonly Action unsubscribe;

  private SubscriptionToken( ) { }
  public SubscriptionToken( Action unsubscribe )
  {
    this.unsubscribe = unsubscribe;
  }

  public void Dispose( )
  {
    unsubscribe( );
  }
}

... a obserwowalne mogą zainstalować zachowanie anulowania subskrypcji podczas subskrypcji:

IDisposable Subscribe<T>( IObserver<T> observer )
{
  var subscriberId = Guid.NewGuid( );
  subscribers.Add( subscriberId, observer );

  return new SubscriptionToken<T>
  (
    ( ) =>
    subscribers.Remove( subscriberId );
  );
}

Jeśli Twój obserwator przechwytuje zdarzenia z wielu obserwowalnych obiektów, możesz chcieć upewnić się, że w samych zdarzeniach znajduje się jakiś rodzaj korelacji ... tak jak w przypadku zdarzeń .Net sender. To od Ciebie zależy, czy to ma znaczenie, czy nie. Nie jest upieczony, jak prawidłowo uzasadniłeś. (pytanie 3)

Glina
źródło