Muszę zadzwonić do usługi nadrzędnej (Azure Blob Service), aby przekazać dane do OutputStream, który następnie muszę zawrócić i odesłać z powrotem do klienta, poprzez akka. Bez akka (i tylko kodu serwletu) po prostu wziąłbym ServletOutputStream i przekazałem go do metody usługi lazurowej.
Najbliższe, na jakie mogę się natknąć, i oczywiście jest to złe, to coś takiego
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Chodzi o to, że dzwonię do usługi nadrzędnej, aby zapełnić strumień wyjściowy, wywołując funkcję blobClient.download (os);
Wygląda na to, że funkcja lambda zostaje wywołana i powraca, ale potem kończy się niepowodzeniem, ponieważ nie ma żadnych danych ani czegoś takiego. Jakbym nie miał mieć tej funkcji lambda, ale może zwróci jakiś obiekt, który wykonuje tę pracę? Niepewny.
Jak to zrobić?
źródło
download
? Czy przesyła strumieniowo daneos
i zwraca je dopiero po zakończeniu zapisywania danych?Odpowiedzi:
Prawdziwy problem polega na tym, że interfejs API platformy Azure nie jest przeznaczony do presji wstecznej. Strumień wyjściowy nie może zasygnalizować z powrotem do platformy Azure, że nie jest gotowy na więcej danych. Innymi słowy: jeśli platforma Azure wypycha dane szybciej, niż jesteś w stanie je zużyć, będzie musiała gdzieś wystąpić brzydka przepełnienie bufora.
Akceptując ten fakt, następną najlepszą rzeczą, jaką możemy zrobić, jest:
Source.lazySource
do rozpoczynania pobierania danych tylko wtedy, gdy jest zapotrzebowanie na dalsze dane (np. Uruchamiane jest źródło i żądane są dane).download
wywołanie w innym wątku, aby kontynuowało wykonywanie bez blokowania zwracania źródła. Raz można to zrobić za pomocąFuture
(nie jestem pewien, jakie są najlepsze praktyki Java, ale powinny działać dobrze tak czy inaczej). Chociaż na początku nie będzie to miało znaczenia, może być konieczne wybranie kontekstu wykonania innego niżsystem.dispatcher
- wszystko zależy od tego, czydownload
blokuje, czy nie.Z góry przepraszam, jeśli ten kod Java jest źle sformułowany - używam Akka ze Scalą, więc to wszystko z powodu spojrzenia na API Akka Java i odwołanie do składni Java.
źródło
W
OutputStream
tym przypadku jest to „zmaterializowana wartość”Source
i zostanie ona utworzona dopiero po uruchomieniu strumienia (lub „zmaterializowaniu” się w działającym strumieniu). Uruchamianie go jest poza twoją kontrolą, ponieważ przekazujesz goSource
do Akka HTTP, a to później faktycznie uruchomi twoje źródło..mapMaterializedValue(matval -> ...)
jest zwykle używany do przekształcania zmaterializowanej wartości, ale ponieważ jest on wywoływany w ramach materializacji, możesz go użyć do wykonania efektów ubocznych, takich jak wysłanie matval w wiadomości, tak jak się zorientowałeś, niekoniecznie nic złego jest z że nawet jeśli wygląda fajnie. Ważne jest, aby zrozumieć, że strumień nie dokończy materializacji i zacznie działać, dopóki lambda się nie zakończy. Oznacza to problemy, jeślidownload()
blokuje, a nie przerywa pracę nad innym wątkiem i natychmiast wraca.Istnieje jednak inne rozwiązanie:
Source.preMaterialize()
materializuje źródło i dajePair
zmaterializowaną wartość oraz nowe,Source
które można wykorzystać do wykorzystania już uruchomionego źródła:Zauważ, że w twoim kodzie jest kilka dodatkowych rzeczy, o których należy pamiętać, przede wszystkim, jeśli
blobClient.download(os)
wywołanie blokuje się, dopóki nie zostanie wykonane, a ty wywołujesz to od aktora, w takim przypadku musisz upewnić się, że twój aktor nie zagłodzi dyspozytora i nie przestanie inne podmioty w Twojej aplikacji od wykonania (patrz dokumenty Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).źródło