Programowanie sieci asynchronicznej przy użyciu rozszerzeń reaktywnych

25

Po wykonaniu (mniej więcej) asynchronicznego socketprogramowania „niskiego poziomu” wiele lat temu (w sposób oparty na zdarzeniu w oparciu o asynchroniczny wzorzec asynchroniczny (EAP) ) i ostatnio przeniesieniu „w górę” do TcpListener(model programowania asynchronicznego (APM) ), a następnie próbując przejść do async/await( TAP) opartego na zadaniach (ang. Task-Asynchronous Pattern, TAP) ) Prawie to robiłem, ponieważ musiałem przejmować się całym tym „niskim poziomem hydrauliki”. Więc pomyślałem; dlaczego nie spróbować RX( Reactive Extensions ), ponieważ może lepiej pasować do mojej problematycznej domeny.

Dużo kodu, który piszę, musi być z wieloma klientami łączącymi się z Tcp z moją aplikacją, która następnie rozpoczyna dwukierunkową (asynchroniczną) komunikację. Klient lub serwer może w dowolnym momencie zdecydować, że wiadomość musi zostać wysłana i zrobić to, więc nie jest to twoja klasyczna request/responsekonfiguracja, ale raczej dwustronna „linia” w czasie rzeczywistym, otwarta dla obu stron, aby wysłać co tylko zechce. , kiedy tylko chcą. (Jeśli ktoś ma dobre imię, by to opisać, chętnie to usłyszę!).

„Protokół” różni się w zależności od aplikacji (i tak naprawdę nie ma związku z moim pytaniem). Mam jednak wstępne pytanie:

  1. Biorąc pod uwagę, że działa tylko jeden „serwer”, ale musi on śledzić wiele (zwykle tysiące) połączeń (np. Klientów), z których każde ma (z braku lepszego opisu) własną „maszynę stanu”, aby śledzić swoje wewnętrzne stwierdza itp., jakie podejście wolisz? EAP / TAP / APM? Czy RX byłby nawet uważany za opcję? Jeśli nie to dlaczego?

Muszę więc pracować w trybie asynchronicznym, ponieważ a) nie jest to protokół żądania / odpowiedzi, więc nie mogę mieć wątku / klienta w „czekającym na wiadomość” - blokującym połączeniu lub „wysyłającym wiadomość” - blokującym połączeniu (jednak, jeśli wysyłanie jest blokowanie tylko dla tego klienta, z którym mógłbym z nim żyć) ib) muszę obsłużyć wiele jednoczesnych połączeń. Nie widzę możliwości (niezawodnego) wykonania tego przy użyciu blokowania połączeń.

Większość moich aplikacji jest związanych z VoiP; czy to wiadomości SIP od klientów SIP, czy wiadomości PBX (powiązane) z aplikacji takich jak FreeSwitch / OpenSIPS itp., ale możesz, w najprostszej formie, wyobrazić sobie serwer „czatu” próbujący obsłużyć wielu klientów „czatu”. Większość protokołów jest oparta na tekście (ASCII).

Po wdrożeniu wielu różnych permutacji wyżej wymienionych technik chciałbym uprościć moją pracę, tworząc obiekt, który mogę po prostu utworzyć, wskazać, na co IPEndpointma słuchać, i powiedzieć mi, gdy dzieje się coś interesującego (co zwykle użyj zdarzeń dla, więc niektóre EAP są zwykle mieszane z pozostałymi dwiema technikami). Klasa nie powinna zawracać sobie głowy próbą „zrozumienia” protokołu; powinien po prostu obsługiwać łańcuchy przychodzące / wychodzące. I tak, mając oko na RX, mając nadzieję, że to (w końcu) uprości pracę, stworzyłem od nowa nowe „skrzypce”:

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
        f.Start();
        Console.ReadKey();
        f.Stop();
        Console.ReadKey();
    }
}

public class FiddleServer
{
    private TcpListener _listener;
    private ConcurrentDictionary<ulong, FiddleClient> _clients;
    private ulong _currentid = 0;

    public IPEndPoint LocalEP { get; private set; }

    public FiddleServer(IPEndPoint localEP)
    {
        this.LocalEP = localEP;
        _clients = new ConcurrentDictionary<ulong, FiddleClient>();
    }

    public void Start()
    {
        _listener = new TcpListener(this.LocalEP);
        _listener.Start();
        Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
            //OnNext
            tcpclient =>
            {
                //Create new FSClient with unique ID
                var fsclient = new FiddleClient(_currentid++, tcpclient);
                //Keep track of clients
                _clients.TryAdd(fsclient.ClientId, fsclient);
                //Initialize connection
                fsclient.Send("connect\n\n");

                Console.WriteLine("Client {0} accepted", fsclient.ClientId);
            },
            //OnError
            ex =>
            {

            },
            //OnComplete
            () =>
            {
                Console.WriteLine("Client connection initialized");
                //Accept new connections
                _listener.AcceptTcpClientAsync();
            }
        );
        Console.WriteLine("Started");
    }

    public void Stop()
    {
        _listener.Stop();
        Console.WriteLine("Stopped");
    }

    public void Send(ulong clientid, string rawmessage)
    {
        FiddleClient fsclient;
        if (_clients.TryGetValue(clientid, out fsclient))
        {
            fsclient.Send(rawmessage);
        }
    }
}

public class FiddleClient
{
    private TcpClient _tcpclient;

    public ulong ClientId { get; private set; }

    public FiddleClient(ulong id, TcpClient tcpclient)
    {
        this.ClientId = id;
        _tcpclient = tcpclient;
    }

    public void Send(string rawmessage)
    {
        Console.WriteLine("Sending {0}", rawmessage);
        var data = Encoding.ASCII.GetBytes(rawmessage);
        _tcpclient.GetStream().WriteAsync(data, 0, data.Length);    //Write vs WriteAsync?
    }
}

Zdaję sobie sprawę, że w tym „skrzypcach” jest trochę szczegółów związanych z implementacją; w tym przypadku pracuję z FreeSwitch ESL, więc "connect\n\n"skrzypce powinny zostać usunięte podczas refaktoryzacji do bardziej ogólnego podejścia.

Wiem również, że muszę refaktoryzować metody anonimowe do metod instancji prywatnej w klasie Server; Po prostu nie jestem pewien, jakiej konwencji (np. „ OnSomething”) Użyć dla ich nazw metod?

To jest moja podstawa / punkt początkowy / fundament (który wymaga trochę „ulepszenia”). Mam kilka pytań na ten temat:

  1. Zobacz powyższe pytanie „1”
  2. Czy jestem na dobrej drodze? Czy moje decyzje „projektowe” są niesprawiedliwe?
  3. Pod względem współbieżności: czy poradziłoby sobie z tysiącami klientów (parsowanie / obsługa rzeczywistych wiadomości na bok)
  4. O wyjątkach: Nie jestem pewien, jak uzyskać wyjątki zgłaszane przez klientów „do” serwera („RX-mądry”); jaki byłby dobry sposób?
  5. Mogę teraz uzyskać dowolnego podłączonego klienta z mojej klasy serwera (używając go ClientId), zakładając, że narażam klientów w taki czy inny sposób i wywołuję metody bezpośrednio na nich. Mogę również wywoływać metody za pośrednictwem klasy Server (na przykład Send(clientId, rawmessage)metoda (podczas gdy to drugie podejście byłoby metodą „wygodną” do szybkiego przesyłania wiadomości na drugą stronę).
  6. Nie jestem do końca pewien, gdzie (i jak) iść stąd:
    • a) Muszę obsługiwać wiadomości przychodzące; jak mam to skonfigurować? Mogę pobrać strumień kursu, ale gdzie miałbym poradzić sobie z odzyskiwaniem odebranych bajtów? Myślę, że potrzebuję czegoś w rodzaju „ObservableStream”, do którego mogę się zapisać? Czy umieściłbym to w FiddleClientlub FiddleServer?
    • b) Zakładając, że chcę uniknąć używania zdarzenia, dopóki te FiddleClient/ FiddleServerklasy nie zostaną zaimplementowane bardziej konkretnie, aby dostosować ich protokół do konkretnej aplikacji itp. za pomocą bardziej szczegółowych FooClient/ FooServerklas: w jaki sposób miałbym przejść od pobierania danych z podstawowych klas „Fiddle” do ich bardziej konkretne odpowiedniki?

Artykuły / linki, które już przeczytałem / przejrzałem / wykorzystałem w celach informacyjnych:

RobIII
źródło
Spójrz na istniejącą bibliotekę ReactiveSockets
Flagbug
2
Nie szukam bibliotek ani linków (choć są one mile widziane), ale do wkładu / porady / pomocy w moich pytaniach i ogólnej konfiguracji. Chcę nauczyć się ulepszać swój własny kod i móc lepiej zdecydować, w jakim kierunku to wziąć, rozważyć zalety i wady itp. Nie odwołując się do jakiejś biblioteki, upuść ją i przejdź dalej. Chcę wyciągnąć wnioski z tego doświadczenia i zdobyć więcej doświadczenia w programowaniu Rx / sieci.
RobIII
Jasne, ale ponieważ biblioteka jest oprogramowaniem typu open source, możesz zobaczyć, jak się tam zaimplementowano
Flagbug
1
Jasne, ale spojrzenie na kod źródłowy nie wyjaśnia, dlaczego podjęto / podjęto pewne decyzje projektowe. A ponieważ jestem stosunkowo nowy w Rx w połączeniu z programowaniem sieciowym, nie mam wystarczającego doświadczenia, aby stwierdzić, czy ta biblioteka jest dobra, czy projekt ma sens, czy zostały podjęte właściwe decyzje, a nawet jeśli jest to dla mnie odpowiednie.
RobIII
Myślę, że dobrze będzie przemyśleć serwer jako aktywnego członka uzgadniania, aby serwer inicjował połączenie zamiast nasłuchiwać połączeń. Na przykład tutaj: codeproject.com/Articles/20250/Reverse-Connection-Shell

Odpowiedzi:

1

... Chciałbym uprościć moją pracę, tworząc obiekt, który mogę po prostu utworzyć, wskazać, którego IPEndpoint ma słuchać, i informować mnie, gdy dzieje się coś interesującego ...

Po przeczytaniu tego oświadczenia od razu pomyślałem o „aktorach”. Aktorzy są bardzo podobni do obiektów, z tym wyjątkiem, że mają tylko jedno wejście, w którym przekazujesz mu wiadomość (zamiast bezpośrednio wywoływać metody obiektu) i działają asynchronicznie. W bardzo uproszczonym przykładzie ... stworzyłbyś aktora i wysłałeś mu wiadomość z punktem końcowym IPE i adresem aktora, na który chcesz wysłać wynik. Znika i działa w tle. Odpowiadasz tylko wtedy, gdy wydarzy się „coś interesującego”. Możesz utworzyć dowolną liczbę aktorów, aby obsłużyć obciążenie.

Nie znam bibliotek aktorów w .Net, chociaż wiem, że istnieją. Znam bibliotekę TPL Dataflow (będzie o niej rozdział w mojej książce http://DataflowBook.com ) i za pomocą tej biblioteki powinno być łatwo zaimplementować prosty model aktora.

Matt Carkci
źródło
To wygląda interesująco. Przygotuję projekt zabawki, aby sprawdzić, czy jest dla mnie odpowiedni. Dzieki za sugestie.
RobIII