Biblioteka Akka Streams ma już dość bogatą dokumentację . Jednak głównym problemem jest dla mnie to, że zawiera on zbyt dużo materiału - czuję się przytłoczony liczbą pojęć, których muszę się nauczyć. Wiele pokazanych tam przykładów jest bardzo ciężkich i nie można ich łatwo przetłumaczyć na rzeczywiste przypadki użycia, a zatem są dość ezoteryczne. Myślę, że zawiera zbyt wiele szczegółów bez wyjaśnienia, jak zbudować wszystkie bloki konstrukcyjne razem i jak dokładnie pomaga rozwiązać określone problemy.
Są źródła, ujścia, przepływy, etapy wykresu, wykresy częściowe, materializacja, DSL wykresu i wiele innych, a ja po prostu nie wiem od czego zacząć. Przewodnik szybkiego startu ma być miejscem początkowym, ale go nie rozumiem. Po prostu rzuca się w wyżej wymienione pojęcia, nie wyjaśniając ich. Ponadto przykładów kodu nie można wykonać - brakuje części, co sprawia, że mniej lub bardziej niemożliwe jest śledzenie tekstu.
Czy ktoś może wyjaśnić źródła pojęć, ujścia, przepływy, etapy wykresów, wykresy częściowe, materializację i może inne rzeczy, których mi brakowało, w prostych słowach i za pomocą łatwych przykładów, które nie wyjaśniają każdego szczegółu (i które prawdopodobnie nie są potrzebne początek)?
źródło
Odpowiedzi:
Ta odpowiedź zależy od
akka-stream
wersji2.4.2
. Interfejs API może się nieco różnić w innych wersjach. Zależność może być wykorzystana przez sbt :Dobra, zacznijmy. Interfejs API Akka Streams składa się z trzech głównych typów. W przeciwieństwie do strumieni reaktywnych , typy te są znacznie potężniejsze, a zatem bardziej złożone. Zakłada się, że dla wszystkich przykładów kodu istnieją już następujące definicje:
Te
import
stwierdzenia są potrzebne do deklaracji typu.system
reprezentuje system aktorski Akka imaterializer
reprezentuje kontekst oceny strumienia. W naszym przypadku używamy aActorMaterializer
, co oznacza, że strumienie są oceniane nad aktorami. Obie wartości są oznaczone jakoimplicit
, co daje kompilatorowi Scala możliwość automatycznego wstrzykiwania tych dwóch zależności, gdy tylko są one potrzebne. Importujemy równieżsystem.dispatcher
, który jest kontekstem wykonania dlaFutures
.Nowy interfejs API
Strumienie Akka mają następujące kluczowe właściwości:
Materializer
.Source
,Sink
orazFlow
. Bloki konstrukcyjne tworzą wykres, którego ocena opiera się naMaterializer
i musi zostać wyraźnie uruchomiona.Poniżej podano głębsze wprowadzenie do korzystania z trzech głównych typów.
Źródło
A
Source
jest twórcą danych, służy jako źródło wejściowe do strumienia. KażdySource
ma jeden kanał wyjściowy i brak kanału wejściowego. Wszystkie dane przepływają przez kanał wyjściowy do wszystkiego, co jest podłączone doSource
.Zdjęcie pochodzi z boldradius.com .
A
Source
można utworzyć na wiele sposobów:W powyższych przypadkach wprowadziliśmy
Source
skończone dane, co oznacza, że ostatecznie się zakończą. Nie należy zapominać, że strumienie reaktywne są domyślnie leniwe i asynchroniczne. Oznacza to, że należy wyraźnie poprosić o ocenę strumienia. W strumieniach Akka można to zrobićrun*
metodami.runForeach
Byłoby nie różni się od znanejforeach
funkcji - dziękirun
dodaniu czyni wyraźne, że pytamy o ocenę strumienia. Ponieważ dane skończone są nudne, kontynuujemy z nieskończonymi:Dzięki tej
take
metodzie możemy stworzyć sztuczny punkt zatrzymania, który uniemożliwi nam ocenę w nieskończoność. Ponieważ obsługa aktorów jest wbudowana, możemy również z łatwością karmić strumień wiadomościami, które są wysyłane do aktora:Widzimy, że
Futures
są wykonywane asynchronicznie na różnych wątkach, co wyjaśnia wynik. W powyższym przykładzie bufor dla przychodzących elementów nie jest konieczny i dlategoOverflowStrategy.fail
możemy skonfigurować, że strumień powinien zawieść przy przepełnieniu bufora. Zwłaszcza za pośrednictwem tego interfejsu aktora możemy przesyłać strumień z dowolnego źródła danych. Nie ma znaczenia, czy dane są tworzone przez ten sam wątek, inny wątek, inny proces, czy pochodzą ze zdalnego systemu przez Internet.Tonąć
A
Sink
jest w zasadzie przeciwieństwem aSource
. Jest to punkt końcowy strumienia i dlatego zużywa dane. ASink
ma jeden kanał wejściowy i brak kanału wyjściowego.Sinks
są szczególnie potrzebne, gdy chcemy określić zachowanie modułu gromadzącego dane w sposób wielokrotnego użytku i bez oceny strumienia. Znane jużrun*
metody nie pozwalają nam na te właściwości, dlatego zaleca się stosowanieSink
zamiast nich.Zdjęcie pochodzi z boldradius.com .
Krótki przykład
Sink
działania:Połączenie
Source
z aSink
można wykonać za pomocą tejto
metody. Zwraca tak zwanyRunnableFlow
, który jest, jak zobaczymy później, specjalną formąFlow
- strumienia, który można wykonać, po prostu wywołując jegorun()
metodę.Zdjęcie pochodzi z boldradius.com .
Oczywiście możliwe jest przekazanie aktorowi wszystkich wartości, które trafiają do zlewu:
Pływ
Źródła danych i ujścia są świetne, jeśli potrzebujesz połączenia między strumieniami Akka i istniejącym systemem, ale tak naprawdę nic z nimi nie możesz zrobić. Przepływy to ostatni brakujący element w abstrakcyjnej bazie strumieni Akka. Działają one jako łącznik między różnymi strumieniami i mogą służyć do przekształcania jego elementów.
Zdjęcie pochodzi z boldradius.com .
Jeśli a
Flow
jest podłączone doSource
nowego,Source
powstaje wynik. PodobnieFlow
połączenie z aSink
tworzy nowySink
. A wFlow
połączeniu z ASource
iSink
wyniki wRunnableFlow
. Dlatego znajdują się między kanałem wejściowym a wyjściowym, ale same w sobie nie odpowiadają jednemu ze smaków, o ile nie są podłączone do ani a,Source
ani aSink
.Zdjęcie pochodzi z boldradius.com .
Aby lepiej zrozumieć
Flows
, przyjrzymy się niektórym przykładom:Poprzez
via
metodę możemy podłączyćSource
zFlow
. Musimy określić typ danych wejściowych, ponieważ kompilator nie może nas o tym wnioskować. Jak możemy zobaczyć już w tym prostym przykładzie, przepływyinvert
idouble
są całkowicie niezależne od jakichkolwiek danych producentów i konsumentów. Przekształcają tylko dane i przekazują je do kanału wyjściowego. Oznacza to, że możemy ponownie wykorzystać przepływ między wieloma strumieniami:s1
is2
reprezentują zupełnie nowe strumienie - nie współużytkują żadnych danych przez swoje bloki konstrukcyjne.Nieograniczone strumienie danych
Zanim przejdziemy dalej, powinniśmy ponownie zapoznać się z niektórymi kluczowymi aspektami strumieni reaktywnych. Nieograniczona liczba elementów może dotrzeć w dowolnym punkcie i może umieścić strumień w różnych stanach. Oprócz strumienia uruchomialnego, który jest zwykłym stanem, strumień może zostać zatrzymany albo przez błąd, albo przez sygnał, który oznacza, że nie dotrą żadne dalsze dane. Strumień można modelować w sposób graficzny, zaznaczając zdarzenia na osi czasu, tak jak ma to miejsce:
Zdjęcie zaczerpnięte z wprowadzenia do programowania reaktywnego, którego brakowało .
Widzieliśmy już możliwe do uruchomienia przepływy w przykładach poprzedniej sekcji. Otrzymujemy
RunnableGraph
ilekroć rzeczywiście można zmaterializować strumień, co oznacza, że aSink
jest podłączony doSource
. Do tej pory zawsze materializowaliśmy się do wartościUnit
, którą można zobaczyć w typach:For
Source
iSink
parametr drugiego typu oraz dlaFlow
parametru trzeciego typu oznaczają zmaterializowaną wartość. W tej odpowiedzi pełne znaczenie materializacji nie zostanie wyjaśnione. Jednak dalsze szczegóły na temat materializacji można znaleźć w oficjalnej dokumentacji . Na razie jedyne, co musimy wiedzieć, to to, że zmaterializowana wartość jest tym, co otrzymujemy, gdy uruchamiamy strumień. Ponieważ do tej pory byliśmy zainteresowani tylko efektami ubocznymi, otrzymaliśmyUnit
wartość zmaterializowaną. Wyjątkiem było zmaterializowanie zlewu, co spowodowało:Future
. Dało nam toFuture
, ponieważ ta wartość może oznaczać zakończenie strumienia połączonego z ujściem. Dotychczasowe przykłady kodu były przyjemne do wyjaśnienia tej koncepcji, ale były również nudne, ponieważ zajmowaliśmy się tylko strumieniami skończonymi lub bardzo prostymi nieskończonymi. Aby było bardziej interesująco, poniżej zostanie wyjaśniony pełny strumień asynchroniczny i nieograniczony.Przykład ClickStream
Na przykład chcemy mieć strumień, który przechwytuje zdarzenia kliknięcia. Aby było to trudniejsze, powiedzmy, że chcemy również grupować zdarzenia kliknięcia, które mają miejsce w krótkim czasie po sobie. W ten sposób możemy łatwo odkryć podwójne, potrójne lub dziesięciokrotne kliknięcia. Ponadto chcemy odfiltrować wszystkie pojedyncze kliknięcia. Weź głęboki oddech i wyobraź sobie, jak rozwiązałbyś ten problem w sposób nadrzędny. Założę się, że nikt nie byłby w stanie wdrożyć rozwiązania, które działa poprawnie przy pierwszej próbie. Reaktywnie ten problem jest prosty do rozwiązania. W rzeczywistości rozwiązanie jest tak proste i łatwe do wdrożenia, że możemy nawet wyrazić to na diagramie, który bezpośrednio opisuje zachowanie kodu:
Zdjęcie zaczerpnięte z wprowadzenia do programowania reaktywnego, którego brakowało .
Szare pola to funkcje opisujące, w jaki sposób jeden strumień jest przekształcany w inny. Dzięki
throttle
funkcji gromadzimy kliknięcia w ciągu 250 milisekund,map
afilter
funkcje i powinny być zrozumiałe. Kolorowe kule reprezentują zdarzenie, a strzałki pokazują, jak przepływają one przez nasze funkcje. Później na etapach przetwarzania otrzymujemy coraz mniej elementów przepływających przez nasz strumień, ponieważ grupujemy je razem i odfiltrowujemy. Kod tego obrazu wyglądałby mniej więcej tak:Całą logikę można przedstawić tylko w czterech wierszach kodu! W Scali moglibyśmy napisać to jeszcze krócej:
Definicja
clickStream
jest nieco bardziej złożona, ale dzieje się tak tylko dlatego, że przykładowy program działa na JVM, gdzie przechwytywanie zdarzeń kliknięcia nie jest łatwe. Inną komplikacją jest to, że Akka domyślnie nie zapewnia tejthrottle
funkcji. Zamiast tego musieliśmy to napisać sami. Ponieważ ta funkcja jest (jak ma to miejsce w przypadku funkcjimap
lubfilter
) wielokrotnego użytku w różnych przypadkach użycia, nie liczę tych linii do liczby linii potrzebnych do implementacji logiki. Jednak w imperatywnych językach jest rzeczą normalną, że logiki nie można użyć tak łatwo i że różne logiczne kroki dzieją się w jednym miejscu, zamiast być stosowane sekwencyjnie, co oznacza, że prawdopodobnie zmienilibyśmy nasz kod z logiką dławienia. Pełny przykładowy kod jest dostępny jakoIstota i nie będzie tu więcej omawiana.Przykład SimpleWebServer
Zamiast tego należy omówić inny przykład. Chociaż strumień kliknięć jest dobrym przykładem pozwalającym strumieniom Akka obsługiwać przykład z prawdziwego świata, brakuje mu mocy do pokazania równoległego wykonywania w akcji. Następny przykład powinien reprezentować mały serwer WWW, który może obsługiwać wiele żądań równolegle. Serwer WWW powinien akceptować połączenia przychodzące i odbierać od nich sekwencje bajtów reprezentujące drukowalne znaki ASCII. Te sekwencje bajtów lub ciągi znaków powinny być podzielone przy wszystkich znakach nowej linii na mniejsze części. Następnie serwer odpowiada klientowi każdą z linii podziału. Alternatywnie może zrobić coś innego z liniami i dać specjalny token odpowiedzi, ale chcemy uprościć ten przykład i dlatego nie wprowadzamy żadnych wymyślnych funkcji. Zapamiętaj, serwer musi być w stanie obsłużyć wiele żądań jednocześnie, co w zasadzie oznacza, że żadne żądanie nie może blokować żadnego innego żądania z dalszego wykonania. Rozwiązanie wszystkich tych wymagań może być trudne w bezwzględny sposób - jednak w przypadku strumieni Akka nie powinniśmy potrzebować więcej niż kilku linii do rozwiązania któregokolwiek z nich. Najpierw omówmy sam serwer:
Zasadniczo istnieją tylko trzy główne elementy składowe. Pierwszy musi akceptować połączenia przychodzące. Drugi musi obsłużyć przychodzące żądania, a trzeci musi wysłać odpowiedź. Wdrożenie wszystkich tych trzech elementów jest tylko trochę bardziej skomplikowane niż wdrożenie strumienia kliknięć:
Funkcja
mkServer
przyjmuje (oprócz adresu i portu serwera) także system aktorów i materializator jako parametry niejawne. Przepływ sterowania serwera jest reprezentowany przezbinding
, który pobiera źródło połączeń przychodzących i przekazuje je do ujścia połączeń przychodzących. WewnątrzconnectionHandler
, który jest naszym zlewem, obsługujemy każde połączenie przepływemserverLogic
, które zostanie opisane później.binding
zwraca aFuture
, która kończy się, gdy serwer został uruchomiony lub uruchomienie nie powiodło się, co może mieć miejsce, gdy port jest już zajęty przez inny proces. Kod nie odzwierciedla jednak całkowicie grafiki, ponieważ nie widzimy bloku konstrukcyjnego, który obsługuje odpowiedzi. Powodem tego jest to, że połączenie samo zapewnia tę logikę. Jest to przepływ dwukierunkowy, a nie tylko jednokierunkowy, jak widzieliśmy w poprzednich przykładach. Tak jak w przypadku materializacji, takich złożonych przepływów nie należy tutaj wyjaśniać. Oficjalna dokumentacja ma mnóstwo materiału do pokrycia bardziej złożonych wykresów przepływu. Na razie wystarczy wiedzieć, żeTcp.IncomingConnection
reprezentuje połączenie, które wie, jak odbierać żądania i jak wysyłać odpowiedzi. Część, której wciąż brakuje, toserverLogic
blok konstrukcyjny. Może to wyglądać tak:Po raz kolejny jesteśmy w stanie podzielić logikę na kilka prostych elementów składowych, które razem tworzą przepływ naszego programu. Najpierw chcemy podzielić naszą sekwencję bajtów na linie, co musimy zrobić, gdy znajdziemy znak nowej linii. Następnie bajty każdej linii należy przekonwertować na ciąg, ponieważ praca z bajtami surowymi jest uciążliwa. Ogólnie rzecz biorąc, moglibyśmy otrzymać binarny strumień skomplikowanego protokołu, co sprawiłoby, że praca z przychodzącymi surowymi danymi była niezwykle trudna. Gdy mamy czytelny ciąg, możemy utworzyć odpowiedź. Dla uproszczenia odpowiedzią może być w naszym przypadku cokolwiek. Na koniec musimy przekonwertować naszą odpowiedź na sekwencję bajtów, które można wysłać przewodowo. Kod całej logiki może wyglądać następująco:
Wiemy już, że
serverLogic
jest to przepływ, który wymagaByteString
i musi wytworzyćByteString
. Dziękidelimiter
możemy podzielić naByteString
mniejsze części - w naszym przypadku musi to nastąpić za każdym razem, gdy pojawi się znak nowej linii.receiver
to przepływ, który bierze wszystkie sekwencje podzielonych bajtów i konwertuje je na ciąg. Jest to oczywiście niebezpieczna konwersja, ponieważ tylko drukowalne znaki ASCII powinny być konwertowane na ciąg znaków, ale na nasze potrzeby jest wystarczająca.responder
jest ostatnim komponentem i odpowiada za utworzenie odpowiedzi i konwersję odpowiedzi z powrotem do sekwencji bajtów. W przeciwieństwie do grafiki nie podzieliliśmy tego ostatniego komponentu na dwie części, ponieważ logika jest trywialna. Na koniec łączymy wszystkie przepływy przezvia
funkcjonować. W tym momencie można zapytać, czy zadbaliśmy o wspomnianą na początku własność dla wielu użytkowników. I rzeczywiście tak zrobiliśmy, chociaż może nie być to od razu oczywiste. Patrząc na tę grafikę, powinna stać się wyraźniejsza:serverLogic
Składnikiem jest niczym strumień, który zawiera mniejsze przepływy. Ten komponent pobiera dane wejściowe, które są żądaniami, i generuje dane wyjściowe, które są odpowiedzią. Ponieważ przepływy można konstruować wiele razy i wszystkie działają niezależnie od siebie, osiągamy to poprzez zagnieżdżanie naszej właściwości dla wielu użytkowników. Każde żądanie jest obsługiwane w ramach własnego żądania, dlatego też żądanie krótko działające może zastąpić wcześniej uruchomione żądanie długo działające. Jeśli zastanawiasz się, definicjęserverLogic
tego pokazaną wcześniej można oczywiście napisać znacznie krócej, wstawiając większość jej wewnętrznych definicji:Test serwera WWW może wyglądać następująco:
Aby powyższy przykład kodu działał poprawnie, najpierw musimy uruchomić serwer, który jest przedstawiony przez
startServer
skrypt:Pełny przykład kodu tego prostego serwera TCP można znaleźć tutaj . Jesteśmy w stanie napisać serwer za pomocą Akka Streams, ale także klienta. Może to wyglądać tak:
Pełny kod klienta TCP można znaleźć tutaj . Kod wygląda dość podobnie, ale w przeciwieństwie do serwera nie musimy już zarządzać połączeniami przychodzącymi.
Złożone wykresy
W poprzednich sekcjach widzieliśmy, jak możemy konstruować proste programy z przepływów. Jednak w rzeczywistości często nie wystarczy polegać na już wbudowanych funkcjach, aby konstruować bardziej złożone strumienie. Jeśli chcemy mieć możliwość korzystania ze strumieni Akka do dowolnych programów, musimy wiedzieć, jak zbudować własne niestandardowe struktury sterowania i kombinowalne przepływy, które pozwolą nam uporać się ze złożonością naszych aplikacji. Dobrą wiadomością jest to, że strumienie Akka zostały zaprojektowane w taki sposób, aby skalować je w zależności od potrzeb użytkowników. Aby dać krótkie wprowadzenie do bardziej złożonych części strumieni Akka, dodajemy więcej funkcji do naszego przykładu klient / serwer.
Jedną rzeczą, której nie możemy jeszcze zrobić, jest zamknięcie połączenia. W tym momencie zaczyna się nieco komplikować, ponieważ interfejs API strumienia, który widzieliśmy do tej pory, nie pozwala nam zatrzymać strumienia w dowolnym miejscu. Istnieje jednak
GraphStage
abstrakcja, za pomocą której można tworzyć dowolne etapy przetwarzania wykresów z dowolną liczbą portów wejściowych lub wyjściowych. Przyjrzyjmy się najpierw stronie serwera, gdzie wprowadzamy nowy komponent o nazwiecloseConnection
:Ten interfejs API wygląda o wiele bardziej skomplikowany niż interfejs API flow. Nic dziwnego, że musimy tutaj zrobić wiele koniecznych kroków. W zamian mamy większą kontrolę nad zachowaniem naszych strumieni. W powyższym przykładzie podajemy tylko jeden port wejściowy i jeden port wyjściowy i udostępniamy je systemowi, zastępując
shape
wartość. Ponadto zdefiniowaliśmy tak zwaneInHandler
aOutHandler
, które w tej kolejności odpowiadają za odbieranie i wysyłanie elementów. Jeśli przyjrzałeś się przykładowi pełnego strumienia kliknięć, powinieneś już rozpoznać te komponenty. WInHandler
chwytamy element i jeśli jest to ciąg znaków z jednym znakiem'q'
, chcemy zamknąć strumień. Aby dać klientowi szansę dowiedzieć się, że strumień zostanie wkrótce zamknięty, emitujemy ciąg"BYE"
a następnie natychmiast zamykamy scenę.closeConnection
Składnik może być łączony ze strumieniem za pomocąvia
sposobu, który został wprowadzony w części o przepływach.Oprócz możliwości zamykania połączeń, byłoby również miło, gdybyśmy mogli wyświetlić wiadomość powitalną dla nowo utworzonego połączenia. Aby to zrobić, musimy jeszcze raz pójść nieco dalej:
Funkcja
serverLogic
przyjmuje teraz połączenie przychodzące jako parametr. Wewnątrz jego ciała używamy DSL, który pozwala nam opisać złożone zachowanie strumienia. Dzięki temuwelcome
tworzymy strumień, który może emitować tylko jeden element - wiadomość powitalną.logic
jest tak, jak opisanoserverLogic
w poprzedniej sekcji. Jedyną zauważalną różnicą jest to, żecloseConnection
do tego dodaliśmy . Teraz faktycznie nadchodzi interesująca część DSL. TaGraphDSL.create
funkcja udostępnia konstruktorab
, który służy do wyrażania strumienia jako wykresu. Za pomocą tej~>
funkcji można ze sobą łączyć porty wejściowe i wyjściowe.Concat
Składnik, który jest używany w tym przykładzie można łączyć elementy i jest tu stosowany do poprzedź wiadomość powitalną przed innymi elementami, które pochodzą zinternalLogic
. W ostatnim wierszu udostępniamy tylko port wejściowy logiki serwera i port wyjściowy konkatenowanego strumienia, ponieważ wszystkie pozostałe porty powinny pozostać szczegółami implementacjiserverLogic
komponentu. Aby uzyskać szczegółowe wprowadzenie do wykresu DSL strumieni Akka, odwiedź odpowiednią sekcję w oficjalnej dokumentacji . Pełny przykład kodu złożonego serwera TCP i klienta, który może się z nim komunikować, można znaleźć tutaj . Za każdym razem, gdy otwierasz nowe połączenie od klienta, powinieneś zobaczyć wiadomość powitalną, a po wpisaniu"q"
na kliencie powinieneś zobaczyć komunikat informujący, że połączenie zostało anulowane.Wciąż jest kilka tematów, które nie zostały ujęte w tej odpowiedzi. Zwłaszcza materializacja może przestraszyć jednego lub drugiego czytelnika, ale jestem pewien, że dzięki omawianemu tutaj materiałowi każdy powinien być w stanie samodzielnie przejść kolejne kroki. Jak już wspomniano, oficjalna dokumentacja jest dobrym miejscem do dalszej nauki o strumieniach Akka.
źródło