Jak zacząć korzystać z Akka Streams? [Zamknięte]

222

Biblioteka Akka Streams ma już dość bogatą dokumentację . Jednak głównym problemem jest dla mnie to, że zawiera on zbyt dużo materiału - czuję się przytłoczony liczbą pojęć, których muszę się nauczyć. Wiele pokazanych tam przykładów jest bardzo ciężkich i nie można ich łatwo przetłumaczyć na rzeczywiste przypadki użycia, a zatem są dość ezoteryczne. Myślę, że zawiera zbyt wiele szczegółów bez wyjaśnienia, jak zbudować wszystkie bloki konstrukcyjne razem i jak dokładnie pomaga rozwiązać określone problemy.

Są źródła, ujścia, przepływy, etapy wykresu, wykresy częściowe, materializacja, DSL wykresu i wiele innych, a ja po prostu nie wiem od czego zacząć. Przewodnik szybkiego startu ma być miejscem początkowym, ale go nie rozumiem. Po prostu rzuca się w wyżej wymienione pojęcia, nie wyjaśniając ich. Ponadto przykładów kodu nie można wykonać - brakuje części, co sprawia, że ​​mniej lub bardziej niemożliwe jest śledzenie tekstu.

Czy ktoś może wyjaśnić źródła pojęć, ujścia, przepływy, etapy wykresów, wykresy częściowe, materializację i może inne rzeczy, których mi brakowało, w prostych słowach i za pomocą łatwych przykładów, które nie wyjaśniają każdego szczegółu (i które prawdopodobnie nie są potrzebne początek)?

kiritsuku
źródło
2
Aby uzyskać więcej informacji, jest to omawiane na stronie meta
DavidG,
10
Jako pierwsza osoba, która zagłosuje za zamknięciem tego (zgodnie z wątkiem Meta), pozwól mi najpierw powiedzieć, że twoja odpowiedź tutaj jest świetna . Jest to naprawdę dogłębne i jest z pewnością bardzo pomocnym zasobem. Niestety, pytanie, które zadałeś, jest o wiele za szerokie, aby można było je przepełnić. Jeśli jakoś twoja odpowiedź może być wysłana na pytanie o innym brzmieniu, to niesamowite, ale nie sądzę, że może być. Radzę ponownie przesłać to jako wpis na blogu lub coś podobnego, który ty i inni możecie wykorzystać jako źródło odniesienia w przyszłych odpowiedziach.
James Donnelly,
2
Myślę, że napisanie tego pytania jako posta na blogu nie byłoby skuteczne. Tak, to szerokie pytanie - i to naprawdę dobre pytanie. Zawężenie jego zakresu nie poprawiłoby go. Podana odpowiedź jest wspaniała. Jestem pewien, że Quora chętnie odbierze biznes SO od wielkich pytań.
Mike Slinn
11
@ MikeSlinn nie próbuj rozmawiać z SO o odpowiednich pytaniach, ślepo przestrzegają zasad. Tak długo, jak pytanie nie zostanie usunięte, jestem szczęśliwy i nie chcę przenosić się na inną platformę.
kiritsuku
2
@sschaef How pedantic. Tak, oczywiście zasady nie są nic warte, twoje wspaniałe ja wie o wiele lepiej i wszyscy próbujący je stosować, ślepo podążają za szumem. /tyrada. a co ważniejsze, byłby to świetny dodatek do wersji beta dokumentacji, jeśli w niej jesteś. Nadal możesz go złożyć i tam umieścić, ale powinieneś przynajmniej zauważyć, że nie jest to idealne rozwiązanie dla głównej strony.
Félix Gagnon-Grenier

Odpowiedzi:

506

Ta odpowiedź zależy od akka-streamwersji 2.4.2. Interfejs API może się nieco różnić w innych wersjach. Zależność może być wykorzystana przez sbt :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

Dobra, zacznijmy. Interfejs API Akka Streams składa się z trzech głównych typów. W przeciwieństwie do strumieni reaktywnych , typy te są znacznie potężniejsze, a zatem bardziej złożone. Zakłada się, że dla wszystkich przykładów kodu istnieją już następujące definicje:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

Te importstwierdzenia są potrzebne do deklaracji typu. systemreprezentuje system aktorski Akka i materializerreprezentuje kontekst oceny strumienia. W naszym przypadku używamy a ActorMaterializer, co oznacza, że ​​strumienie są oceniane nad aktorami. Obie wartości są oznaczone jako implicit, co daje kompilatorowi Scala możliwość automatycznego wstrzykiwania tych dwóch zależności, gdy tylko są one potrzebne. Importujemy również system.dispatcher, który jest kontekstem wykonania dla Futures.

Nowy interfejs API

Strumienie Akka mają następujące kluczowe właściwości:

  • Wdrażają specyfikację strumieni reaktywnych , której trzy główne cele, przeciwciśnienie, asynchroniczne i nieblokujące granice oraz interoperacyjność między różnymi implementacjami, w pełni dotyczą również strumieni Akka.
  • Zapewniają one abstrakcję dla silnika oceny dla strumieni, który jest nazywany Materializer.
  • Programy są formułowane jako bloki konstrukcyjne wielokrotnego użytku, które są reprezentowane jako trzy główne typy Source, Sinkoraz Flow. Bloki konstrukcyjne tworzą wykres, którego ocena opiera się na Materializeri musi zostać wyraźnie uruchomiona.

Poniżej podano głębsze wprowadzenie do korzystania z trzech głównych typów.

Źródło

A Sourcejest twórcą danych, służy jako źródło wejściowe do strumienia. Każdy Sourcema jeden kanał wyjściowy i brak kanału wejściowego. Wszystkie dane przepływają przez kanał wyjściowy do wszystkiego, co jest podłączone do Source.

Źródło

Zdjęcie pochodzi z boldradius.com .

A Sourcemożna utworzyć na wiele sposobów:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

W powyższych przypadkach wprowadziliśmy Sourceskończone dane, co oznacza, że ​​ostatecznie się zakończą. Nie należy zapominać, że strumienie reaktywne są domyślnie leniwe i asynchroniczne. Oznacza to, że należy wyraźnie poprosić o ocenę strumienia. W strumieniach Akka można to zrobić run*metodami. runForeachByłoby nie różni się od znanej foreachfunkcji - dzięki rundodaniu czyni wyraźne, że pytamy o ocenę strumienia. Ponieważ dane skończone są nudne, kontynuujemy z nieskończonymi:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

Dzięki tej takemetodzie możemy stworzyć sztuczny punkt zatrzymania, który uniemożliwi nam ocenę w nieskończoność. Ponieważ obsługa aktorów jest wbudowana, możemy również z łatwością karmić strumień wiadomościami, które są wysyłane do aktora:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Widzimy, że Futuressą wykonywane asynchronicznie na różnych wątkach, co wyjaśnia wynik. W powyższym przykładzie bufor dla przychodzących elementów nie jest konieczny i dlatego OverflowStrategy.failmożemy skonfigurować, że strumień powinien zawieść przy przepełnieniu bufora. Zwłaszcza za pośrednictwem tego interfejsu aktora możemy przesyłać strumień z dowolnego źródła danych. Nie ma znaczenia, czy dane są tworzone przez ten sam wątek, inny wątek, inny proces, czy pochodzą ze zdalnego systemu przez Internet.

Tonąć

A Sinkjest w zasadzie przeciwieństwem a Source. Jest to punkt końcowy strumienia i dlatego zużywa dane. A Sinkma jeden kanał wejściowy i brak kanału wyjściowego. Sinkssą szczególnie potrzebne, gdy chcemy określić zachowanie modułu gromadzącego dane w sposób wielokrotnego użytku i bez oceny strumienia. Znane już run*metody nie pozwalają nam na te właściwości, dlatego zaleca się stosowanie Sinkzamiast nich.

Tonąć

Zdjęcie pochodzi z boldradius.com .

Krótki przykład Sinkdziałania:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Połączenie Sourcez a Sinkmożna wykonać za pomocą tej tometody. Zwraca tak zwany RunnableFlow, który jest, jak zobaczymy później, specjalną formą Flow- strumienia, który można wykonać, po prostu wywołując jego run()metodę.

Runnable Flow

Zdjęcie pochodzi z boldradius.com .

Oczywiście możliwe jest przekazanie aktorowi wszystkich wartości, które trafiają do zlewu:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Pływ

Źródła danych i ujścia są świetne, jeśli potrzebujesz połączenia między strumieniami Akka i istniejącym systemem, ale tak naprawdę nic z nimi nie możesz zrobić. Przepływy to ostatni brakujący element w abstrakcyjnej bazie strumieni Akka. Działają one jako łącznik między różnymi strumieniami i mogą służyć do przekształcania jego elementów.

Pływ

Zdjęcie pochodzi z boldradius.com .

Jeśli a Flowjest podłączone do Sourcenowego, Sourcepowstaje wynik. Podobnie Flowpołączenie z a Sinktworzy nowy Sink. A w Flowpołączeniu z A Sourcei Sinkwyniki w RunnableFlow. Dlatego znajdują się między kanałem wejściowym a wyjściowym, ale same w sobie nie odpowiadają jednemu ze smaków, o ile nie są podłączone do ani a, Sourceani a Sink.

Pełny strumień

Zdjęcie pochodzi z boldradius.com .

Aby lepiej zrozumieć Flows, przyjrzymy się niektórym przykładom:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

Poprzez viametodę możemy podłączyć Sourcez Flow. Musimy określić typ danych wejściowych, ponieważ kompilator nie może nas o tym wnioskować. Jak możemy zobaczyć już w tym prostym przykładzie, przepływy inverti doublesą całkowicie niezależne od jakichkolwiek danych producentów i konsumentów. Przekształcają tylko dane i przekazują je do kanału wyjściowego. Oznacza to, że możemy ponownie wykorzystać przepływ między wieloma strumieniami:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1i s2reprezentują zupełnie nowe strumienie - nie współużytkują żadnych danych przez swoje bloki konstrukcyjne.

Nieograniczone strumienie danych

Zanim przejdziemy dalej, powinniśmy ponownie zapoznać się z niektórymi kluczowymi aspektami strumieni reaktywnych. Nieograniczona liczba elementów może dotrzeć w dowolnym punkcie i może umieścić strumień w różnych stanach. Oprócz strumienia uruchomialnego, który jest zwykłym stanem, strumień może zostać zatrzymany albo przez błąd, albo przez sygnał, który oznacza, że ​​nie dotrą żadne dalsze dane. Strumień można modelować w sposób graficzny, zaznaczając zdarzenia na osi czasu, tak jak ma to miejsce:

Pokazuje, że strumień jest sekwencją trwających zdarzeń uporządkowanych w czasie

Zdjęcie zaczerpnięte z wprowadzenia do programowania reaktywnego, którego brakowało .

Widzieliśmy już możliwe do uruchomienia przepływy w przykładach poprzedniej sekcji. Otrzymujemy RunnableGraphilekroć rzeczywiście można zmaterializować strumień, co oznacza, że ​​a Sinkjest podłączony do Source. Do tej pory zawsze materializowaliśmy się do wartości Unit, którą można zobaczyć w typach:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

For Sourcei Sinkparametr drugiego typu oraz dla Flowparametru trzeciego typu oznaczają zmaterializowaną wartość. W tej odpowiedzi pełne znaczenie materializacji nie zostanie wyjaśnione. Jednak dalsze szczegóły na temat materializacji można znaleźć w oficjalnej dokumentacji . Na razie jedyne, co musimy wiedzieć, to to, że zmaterializowana wartość jest tym, co otrzymujemy, gdy uruchamiamy strumień. Ponieważ do tej pory byliśmy zainteresowani tylko efektami ubocznymi, otrzymaliśmy Unitwartość zmaterializowaną. Wyjątkiem było zmaterializowanie zlewu, co spowodowało:Future . Dało nam toFuture, ponieważ ta wartość może oznaczać zakończenie strumienia połączonego z ujściem. Dotychczasowe przykłady kodu były przyjemne do wyjaśnienia tej koncepcji, ale były również nudne, ponieważ zajmowaliśmy się tylko strumieniami skończonymi lub bardzo prostymi nieskończonymi. Aby było bardziej interesująco, poniżej zostanie wyjaśniony pełny strumień asynchroniczny i nieograniczony.

Przykład ClickStream

Na przykład chcemy mieć strumień, który przechwytuje zdarzenia kliknięcia. Aby było to trudniejsze, powiedzmy, że chcemy również grupować zdarzenia kliknięcia, które mają miejsce w krótkim czasie po sobie. W ten sposób możemy łatwo odkryć podwójne, potrójne lub dziesięciokrotne kliknięcia. Ponadto chcemy odfiltrować wszystkie pojedyncze kliknięcia. Weź głęboki oddech i wyobraź sobie, jak rozwiązałbyś ten problem w sposób nadrzędny. Założę się, że nikt nie byłby w stanie wdrożyć rozwiązania, które działa poprawnie przy pierwszej próbie. Reaktywnie ten problem jest prosty do rozwiązania. W rzeczywistości rozwiązanie jest tak proste i łatwe do wdrożenia, że ​​możemy nawet wyrazić to na diagramie, który bezpośrednio opisuje zachowanie kodu:

Logika przykładu strumienia kliknięć

Zdjęcie zaczerpnięte z wprowadzenia do programowania reaktywnego, którego brakowało .

Szare pola to funkcje opisujące, w jaki sposób jeden strumień jest przekształcany w inny. Dzięki throttlefunkcji gromadzimy kliknięcia w ciągu 250 milisekund, mapa filterfunkcje i powinny być zrozumiałe. Kolorowe kule reprezentują zdarzenie, a strzałki pokazują, jak przepływają one przez nasze funkcje. Później na etapach przetwarzania otrzymujemy coraz mniej elementów przepływających przez nasz strumień, ponieważ grupujemy je razem i odfiltrowujemy. Kod tego obrazu wyglądałby mniej więcej tak:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

Całą logikę można przedstawić tylko w czterech wierszach kodu! W Scali moglibyśmy napisać to jeszcze krócej:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

Definicja clickStreamjest nieco bardziej złożona, ale dzieje się tak tylko dlatego, że przykładowy program działa na JVM, gdzie przechwytywanie zdarzeń kliknięcia nie jest łatwe. Inną komplikacją jest to, że Akka domyślnie nie zapewnia tej throttlefunkcji. Zamiast tego musieliśmy to napisać sami. Ponieważ ta funkcja jest (jak ma to miejsce w przypadku funkcji maplub filter) wielokrotnego użytku w różnych przypadkach użycia, nie liczę tych linii do liczby linii potrzebnych do implementacji logiki. Jednak w imperatywnych językach jest rzeczą normalną, że logiki nie można użyć tak łatwo i że różne logiczne kroki dzieją się w jednym miejscu, zamiast być stosowane sekwencyjnie, co oznacza, że ​​prawdopodobnie zmienilibyśmy nasz kod z logiką dławienia. Pełny przykładowy kod jest dostępny jakoIstota i nie będzie tu więcej omawiana.

Przykład SimpleWebServer

Zamiast tego należy omówić inny przykład. Chociaż strumień kliknięć jest dobrym przykładem pozwalającym strumieniom Akka obsługiwać przykład z prawdziwego świata, brakuje mu mocy do pokazania równoległego wykonywania w akcji. Następny przykład powinien reprezentować mały serwer WWW, który może obsługiwać wiele żądań równolegle. Serwer WWW powinien akceptować połączenia przychodzące i odbierać od nich sekwencje bajtów reprezentujące drukowalne znaki ASCII. Te sekwencje bajtów lub ciągi znaków powinny być podzielone przy wszystkich znakach nowej linii na mniejsze części. Następnie serwer odpowiada klientowi każdą z linii podziału. Alternatywnie może zrobić coś innego z liniami i dać specjalny token odpowiedzi, ale chcemy uprościć ten przykład i dlatego nie wprowadzamy żadnych wymyślnych funkcji. Zapamiętaj, serwer musi być w stanie obsłużyć wiele żądań jednocześnie, co w zasadzie oznacza, że ​​żadne żądanie nie może blokować żadnego innego żądania z dalszego wykonania. Rozwiązanie wszystkich tych wymagań może być trudne w bezwzględny sposób - jednak w przypadku strumieni Akka nie powinniśmy potrzebować więcej niż kilku linii do rozwiązania któregokolwiek z nich. Najpierw omówmy sam serwer:

serwer

Zasadniczo istnieją tylko trzy główne elementy składowe. Pierwszy musi akceptować połączenia przychodzące. Drugi musi obsłużyć przychodzące żądania, a trzeci musi wysłać odpowiedź. Wdrożenie wszystkich tych trzech elementów jest tylko trochę bardziej skomplikowane niż wdrożenie strumienia kliknięć:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

Funkcja mkServerprzyjmuje (oprócz adresu i portu serwera) także system aktorów i materializator jako parametry niejawne. Przepływ sterowania serwera jest reprezentowany przez binding, który pobiera źródło połączeń przychodzących i przekazuje je do ujścia połączeń przychodzących. Wewnątrz connectionHandler, który jest naszym zlewem, obsługujemy każde połączenie przepływem serverLogic, które zostanie opisane później. bindingzwraca aFuture, która kończy się, gdy serwer został uruchomiony lub uruchomienie nie powiodło się, co może mieć miejsce, gdy port jest już zajęty przez inny proces. Kod nie odzwierciedla jednak całkowicie grafiki, ponieważ nie widzimy bloku konstrukcyjnego, który obsługuje odpowiedzi. Powodem tego jest to, że połączenie samo zapewnia tę logikę. Jest to przepływ dwukierunkowy, a nie tylko jednokierunkowy, jak widzieliśmy w poprzednich przykładach. Tak jak w przypadku materializacji, takich złożonych przepływów nie należy tutaj wyjaśniać. Oficjalna dokumentacja ma mnóstwo materiału do pokrycia bardziej złożonych wykresów przepływu. Na razie wystarczy wiedzieć, że Tcp.IncomingConnectionreprezentuje połączenie, które wie, jak odbierać żądania i jak wysyłać odpowiedzi. Część, której wciąż brakuje, toserverLogicblok konstrukcyjny. Może to wyglądać tak:

logika serwera

Po raz kolejny jesteśmy w stanie podzielić logikę na kilka prostych elementów składowych, które razem tworzą przepływ naszego programu. Najpierw chcemy podzielić naszą sekwencję bajtów na linie, co musimy zrobić, gdy znajdziemy znak nowej linii. Następnie bajty każdej linii należy przekonwertować na ciąg, ponieważ praca z bajtami surowymi jest uciążliwa. Ogólnie rzecz biorąc, moglibyśmy otrzymać binarny strumień skomplikowanego protokołu, co sprawiłoby, że praca z przychodzącymi surowymi danymi była niezwykle trudna. Gdy mamy czytelny ciąg, możemy utworzyć odpowiedź. Dla uproszczenia odpowiedzią może być w naszym przypadku cokolwiek. Na koniec musimy przekonwertować naszą odpowiedź na sekwencję bajtów, które można wysłać przewodowo. Kod całej logiki może wyglądać następująco:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

Wiemy już, że serverLogicjest to przepływ, który wymaga ByteStringi musi wytworzyć ByteString. Dzięki delimitermożemy podzielić na ByteStringmniejsze części - w naszym przypadku musi to nastąpić za każdym razem, gdy pojawi się znak nowej linii. receiverto przepływ, który bierze wszystkie sekwencje podzielonych bajtów i konwertuje je na ciąg. Jest to oczywiście niebezpieczna konwersja, ponieważ tylko drukowalne znaki ASCII powinny być konwertowane na ciąg znaków, ale na nasze potrzeby jest wystarczająca. responderjest ostatnim komponentem i odpowiada za utworzenie odpowiedzi i konwersję odpowiedzi z powrotem do sekwencji bajtów. W przeciwieństwie do grafiki nie podzieliliśmy tego ostatniego komponentu na dwie części, ponieważ logika jest trywialna. Na koniec łączymy wszystkie przepływy przezviafunkcjonować. W tym momencie można zapytać, czy zadbaliśmy o wspomnianą na początku własność dla wielu użytkowników. I rzeczywiście tak zrobiliśmy, chociaż może nie być to od razu oczywiste. Patrząc na tę grafikę, powinna stać się wyraźniejsza:

serwer i logika serwera połączone

serverLogicSkładnikiem jest niczym strumień, który zawiera mniejsze przepływy. Ten komponent pobiera dane wejściowe, które są żądaniami, i generuje dane wyjściowe, które są odpowiedzią. Ponieważ przepływy można konstruować wiele razy i wszystkie działają niezależnie od siebie, osiągamy to poprzez zagnieżdżanie naszej właściwości dla wielu użytkowników. Każde żądanie jest obsługiwane w ramach własnego żądania, dlatego też żądanie krótko działające może zastąpić wcześniej uruchomione żądanie długo działające. Jeśli zastanawiasz się, definicję serverLogictego pokazaną wcześniej można oczywiście napisać znacznie krócej, wstawiając większość jej wewnętrznych definicji:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Test serwera WWW może wyglądać następująco:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

Aby powyższy przykład kodu działał poprawnie, najpierw musimy uruchomić serwer, który jest przedstawiony przez startServerskrypt:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

Pełny przykład kodu tego prostego serwera TCP można znaleźć tutaj . Jesteśmy w stanie napisać serwer za pomocą Akka Streams, ale także klienta. Może to wyglądać tak:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

Pełny kod klienta TCP można znaleźć tutaj . Kod wygląda dość podobnie, ale w przeciwieństwie do serwera nie musimy już zarządzać połączeniami przychodzącymi.

Złożone wykresy

W poprzednich sekcjach widzieliśmy, jak możemy konstruować proste programy z przepływów. Jednak w rzeczywistości często nie wystarczy polegać na już wbudowanych funkcjach, aby konstruować bardziej złożone strumienie. Jeśli chcemy mieć możliwość korzystania ze strumieni Akka do dowolnych programów, musimy wiedzieć, jak zbudować własne niestandardowe struktury sterowania i kombinowalne przepływy, które pozwolą nam uporać się ze złożonością naszych aplikacji. Dobrą wiadomością jest to, że strumienie Akka zostały zaprojektowane w taki sposób, aby skalować je w zależności od potrzeb użytkowników. Aby dać krótkie wprowadzenie do bardziej złożonych części strumieni Akka, dodajemy więcej funkcji do naszego przykładu klient / serwer.

Jedną rzeczą, której nie możemy jeszcze zrobić, jest zamknięcie połączenia. W tym momencie zaczyna się nieco komplikować, ponieważ interfejs API strumienia, który widzieliśmy do tej pory, nie pozwala nam zatrzymać strumienia w dowolnym miejscu. Istnieje jednak GraphStageabstrakcja, za pomocą której można tworzyć dowolne etapy przetwarzania wykresów z dowolną liczbą portów wejściowych lub wyjściowych. Przyjrzyjmy się najpierw stronie serwera, gdzie wprowadzamy nowy komponent o nazwie closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

Ten interfejs API wygląda o wiele bardziej skomplikowany niż interfejs API flow. Nic dziwnego, że musimy tutaj zrobić wiele koniecznych kroków. W zamian mamy większą kontrolę nad zachowaniem naszych strumieni. W powyższym przykładzie podajemy tylko jeden port wejściowy i jeden port wyjściowy i udostępniamy je systemowi, zastępując shapewartość. Ponadto zdefiniowaliśmy tak zwane InHandlera OutHandler, które w tej kolejności odpowiadają za odbieranie i wysyłanie elementów. Jeśli przyjrzałeś się przykładowi pełnego strumienia kliknięć, powinieneś już rozpoznać te komponenty. W InHandlerchwytamy element i jeśli jest to ciąg znaków z jednym znakiem 'q', chcemy zamknąć strumień. Aby dać klientowi szansę dowiedzieć się, że strumień zostanie wkrótce zamknięty, emitujemy ciąg"BYE"a następnie natychmiast zamykamy scenę. closeConnectionSkładnik może być łączony ze strumieniem za pomocą viasposobu, który został wprowadzony w części o przepływach.

Oprócz możliwości zamykania połączeń, byłoby również miło, gdybyśmy mogli wyświetlić wiadomość powitalną dla nowo utworzonego połączenia. Aby to zrobić, musimy jeszcze raz pójść nieco dalej:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

Funkcja serverLogic przyjmuje teraz połączenie przychodzące jako parametr. Wewnątrz jego ciała używamy DSL, który pozwala nam opisać złożone zachowanie strumienia. Dzięki temu welcometworzymy strumień, który może emitować tylko jeden element - wiadomość powitalną. logicjest tak, jak opisano serverLogicw poprzedniej sekcji. Jedyną zauważalną różnicą jest to, że closeConnectiondo tego dodaliśmy . Teraz faktycznie nadchodzi interesująca część DSL. Ta GraphDSL.createfunkcja udostępnia konstruktora b, który służy do wyrażania strumienia jako wykresu. Za pomocą tej ~>funkcji można ze sobą łączyć porty wejściowe i wyjściowe. ConcatSkładnik, który jest używany w tym przykładzie można łączyć elementy i jest tu stosowany do poprzedź wiadomość powitalną przed innymi elementami, które pochodzą zinternalLogic. W ostatnim wierszu udostępniamy tylko port wejściowy logiki serwera i port wyjściowy konkatenowanego strumienia, ponieważ wszystkie pozostałe porty powinny pozostać szczegółami implementacji serverLogickomponentu. Aby uzyskać szczegółowe wprowadzenie do wykresu DSL strumieni Akka, odwiedź odpowiednią sekcję w oficjalnej dokumentacji . Pełny przykład kodu złożonego serwera TCP i klienta, który może się z nim komunikować, można znaleźć tutaj . Za każdym razem, gdy otwierasz nowe połączenie od klienta, powinieneś zobaczyć wiadomość powitalną, a po wpisaniu "q"na kliencie powinieneś zobaczyć komunikat informujący, że połączenie zostało anulowane.

Wciąż jest kilka tematów, które nie zostały ujęte w tej odpowiedzi. Zwłaszcza materializacja może przestraszyć jednego lub drugiego czytelnika, ale jestem pewien, że dzięki omawianemu tutaj materiałowi każdy powinien być w stanie samodzielnie przejść kolejne kroki. Jak już wspomniano, oficjalna dokumentacja jest dobrym miejscem do dalszej nauki o strumieniach Akka.

kiritsuku
źródło
4
@monksy Nie planowałem publikować tego nigdzie indziej. Jeśli chcesz, możesz ponownie opublikować to na swoim blogu. Interfejs API jest obecnie stabilny w większości części, co oznacza, że ​​prawdopodobnie nie musisz nawet przejmować się konserwacją (większość artykułów na blogu o strumieniach Akka jest już nieaktualna, ponieważ pokazują one interfejs API, który już nie istnieje).
kiritsuku
3
Nie zniknie. Dlaczego to powinno?
kiritsuku
2
@sschaef Może zniknąć, ponieważ pytanie jest nie na temat i jako takie zostało zamknięte.
DavidG
7
@Magisch Zawsze pamiętaj: „Nie usuwamy dobrych treści”. Nie jestem do końca pewien, ale sądzę, że mimo wszystko ta odpowiedź może się kwalifikować.
Deduplicator
9
Ten post może być przydatny dla nowej funkcji Dokumentacja przepełnienia stosu - po otwarciu dla Scali.
SL Barth - Przywróć Monikę