przekazując strumień Akka do usługi upstream w celu wypełnienia

9

Muszę zadzwonić do usługi nadrzędnej (Azure Blob Service), aby przekazać dane do OutputStream, który następnie muszę zawrócić i odesłać z powrotem do klienta, poprzez akka. Bez akka (i tylko kodu serwletu) po prostu wziąłbym ServletOutputStream i przekazałem go do metody usługi lazurowej.

Najbliższe, na jakie mogę się natknąć, i oczywiście jest to złe, to coś takiego

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

Chodzi o to, że dzwonię do usługi nadrzędnej, aby zapełnić strumień wyjściowy, wywołując funkcję blobClient.download (os);

Wygląda na to, że funkcja lambda zostaje wywołana i powraca, ale potem kończy się niepowodzeniem, ponieważ nie ma żadnych danych ani czegoś takiego. Jakbym nie miał mieć tej funkcji lambda, ale może zwróci jakiś obiekt, który wykonuje tę pracę? Niepewny.

Jak to zrobić?

MeBigFatGuy
źródło
Jakie jest zachowanie download? Czy przesyła strumieniowo dane osi zwraca je dopiero po zakończeniu zapisywania danych?
Alec

Odpowiedzi:

2

Prawdziwy problem polega na tym, że interfejs API platformy Azure nie jest przeznaczony do presji wstecznej. Strumień wyjściowy nie może zasygnalizować z powrotem do platformy Azure, że nie jest gotowy na więcej danych. Innymi słowy: jeśli platforma Azure wypycha dane szybciej, niż jesteś w stanie je zużyć, będzie musiała gdzieś wystąpić brzydka przepełnienie bufora.

Akceptując ten fakt, następną najlepszą rzeczą, jaką możemy zrobić, jest:

  • Służy Source.lazySourcedo rozpoczynania pobierania danych tylko wtedy, gdy jest zapotrzebowanie na dalsze dane (np. Uruchamiane jest źródło i żądane są dane).
  • Umieść downloadwywołanie w innym wątku, aby kontynuowało wykonywanie bez blokowania zwracania źródła. Raz można to zrobić za pomocą Future(nie jestem pewien, jakie są najlepsze praktyki Java, ale powinny działać dobrze tak czy inaczej). Chociaż na początku nie będzie to miało znaczenia, może być konieczne wybranie kontekstu wykonania innego niż system.dispatcher- wszystko zależy od tego, czy downloadblokuje, czy nie.

Z góry przepraszam, jeśli ten kod Java jest źle sformułowany - używam Akka ze Scalą, więc to wszystko z powodu spojrzenia na API Akka Java i odwołanie do składni Java.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
Alec
źródło
Fantastyczny. dzięki wielkie. Niewielka zmiana w twoim przykładzie to: Futures.future (() -> {blobClient.download (pair.first ()); return pair.first ();}, system.getDispatcher ());
MeBigFatGuy
@MeBigFatGuy Racja, dzięki!
Alec
1

W OutputStreamtym przypadku jest to „zmaterializowana wartość” Sourcei zostanie ona utworzona dopiero po uruchomieniu strumienia (lub „zmaterializowaniu” się w działającym strumieniu). Uruchamianie go jest poza twoją kontrolą, ponieważ przekazujesz go Sourcedo Akka HTTP, a to później faktycznie uruchomi twoje źródło.

.mapMaterializedValue(matval -> ...)jest zwykle używany do przekształcania zmaterializowanej wartości, ale ponieważ jest on wywoływany w ramach materializacji, możesz go użyć do wykonania efektów ubocznych, takich jak wysłanie matval w wiadomości, tak jak się zorientowałeś, niekoniecznie nic złego jest z że nawet jeśli wygląda fajnie. Ważne jest, aby zrozumieć, że strumień nie dokończy materializacji i zacznie działać, dopóki lambda się nie zakończy. Oznacza to problemy, jeśli download()blokuje, a nie przerywa pracę nad innym wątkiem i natychmiast wraca.

Istnieje jednak inne rozwiązanie: Source.preMaterialize()materializuje źródło i daje Pairzmaterializowaną wartość oraz nowe, Sourcektóre można wykorzystać do wykorzystania już uruchomionego źródła:

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

Zauważ, że w twoim kodzie jest kilka dodatkowych rzeczy, o których należy pamiętać, przede wszystkim, jeśli blobClient.download(os)wywołanie blokuje się, dopóki nie zostanie wykonane, a ty wywołujesz to od aktora, w takim przypadku musisz upewnić się, że twój aktor nie zagłodzi dyspozytora i nie przestanie inne podmioty w Twojej aplikacji od wykonania (patrz dokumenty Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).

johanandren
źródło
1
Dzięki za odpowiedzi. Nie rozumiem, jak to może działać? gdzie idą bajty, gdy wywoływany jest plik blobClient.download (os) (jeśli sam go nazywam)? Wyobraź sobie, że czeka na zapis terabajt danych. wydaje mi się, że wywołanie blobClient.download musi zostać wywołane z wywołania sender.tell, aby była to w zasadzie operacja typu IOUtils.copy. Używając PreMaterialize, nie widzę, jak to się dzieje?
MeBigFatGuy
OutputStream ma wewnętrzny bufor, zacznie akceptować zapisy, dopóki bufor się nie zapełni, jeśli asynchroniczny proces pobierania nie zacznie zużywać elementów do tego czasu, zablokuje wątek zapisu (dlatego wspomniałem, że ważne jest, aby obsłużyć blokowanie).
johanandren
1
Ale jeśli wstępnie zmaterializuję i uzyskam OutputStream, to mój kod wykonuje blobClient.download (os); poprawny? Oznacza to, że musi to zakończyć, zanim będę mógł kontynuować, co jest niemożliwe.
MeBigFatGuy
Jeśli pobieranie (os) nie rozwidla wątku, będziesz musiał poradzić sobie z blokowaniem go i upewnić się, że nie zatrzyma to żadnej innej operacji. Jednym ze sposobów byłoby rozwidlenie wątku, aby wykonać pracę, inny najpierw zareagowałby aktor, a następnie wykonał pracę blokującą, w takim przypadku musisz upewnić się, że aktor nie głoduje innych aktorów, patrz link na końcu moja odpowiedź.
johanandren
w tym momencie po prostu próbuję w ogóle go uruchomić. Nie może nawet przetworzyć pliku 10-bajtowego.
MeBigFatGuy