Powiedzmy, że żądam dużego pliku json, który zawiera listę wielu obiektów. Nie chcę, aby były w pamięci od razu, ale wolałbym je czytać i przetwarzać jeden po drugim. Więc muszę przekształcić System.IO.Stream
strumień asynchroniczny w plik IAsyncEnumerable<T>
. Jak do tego użyć nowego System.Text.Json
interfejsu API?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
c#
.net-core
.net-core-3.0
c#-8.0
system.text.json
Rick de Water
źródło
źródło
Utf8JsonReader
, proszę spojrzeć na niektóre GitHub próbkach i na istniejącym wątku , a takżeGetAsync
sam powraca po otrzymaniu całej odpowiedzi. MusiszSendAsync
zamiast tego używać z `HttpCompletionOption.ResponseContentRead`. Gdy już to zrobisz, możesz użyć JsonTextReadera JSON.NET . KorzystanieSystem.Text.Json
z tego nie jest tak łatwe, jak pokazuje ten problem . Ta funkcjonalność nie jest dostępna, a wdrożenie jej w niskim przydziale za pomocą struktur nie jest banalneOdpowiedzi:
Tak, prawdziwie strumieniujący (de) serializator JSON byłby fajną poprawą wydajności w tak wielu miejscach.
Niestety
System.Text.Json
obecnie tego nie robi. Nie jestem pewien, czy tak będzie w przyszłości - mam taką nadzieję! Naprawdę strumieniowanie deserializacji JSONa okazuje się być dość trudne.Być może mógłbyś sprawdzić, czy wyjątkowo szybki Utf8Json go obsługuje.
Jednak może istnieć niestandardowe rozwiązanie dla konkretnej sytuacji, ponieważ wymagania wydają się ograniczać trudność.
Chodzi o to, aby ręcznie odczytać jeden element z tablicy na raz. Korzystamy z faktu, że każdy element na liście sam w sobie jest prawidłowym obiektem JSON.
Możesz ręcznie pominąć
[
(dla pierwszego elementu) lub,
(dla każdego następnego elementu). Zatem myślę, że najlepszym rozwiązaniem jest użycie platformy .NET CoreUtf8JsonReader
do ustalenia, gdzie kończy się bieżący obiekt, i przesłanie zeskanowanych bajtówJsonDeserializer
.W ten sposób buforujesz tylko nieznacznie jeden obiekt naraz.
A ponieważ mówimy o wydajności, możesz uzyskać dane wejściowe z
PipeReader
, gdy jesteś przy tym. :-)źródło
TL; DR To nie jest banalne
Wygląda jak ktoś już pisał pełny kod dla
Utf8JsonStreamReader
struktury, które odczytuje bufory ze strumienia i przekazuje je do Utf8JsonRreader, umożliwiając łatwy deserializacji zJsonSerializer.Deserialize<T>(ref newJsonReader, options);
. Kod też nie jest trywialny. Powiązane pytanie jest tutaj, a odpowiedź jest tutaj .To jednak nie wystarczy -
HttpClient.GetAsync
powróci dopiero po otrzymaniu całej odpowiedzi, zasadniczo buforując wszystko w pamięci.Aby tego uniknąć, należy używać HttpClient.GetAsync (ciąg, HttpCompletionOption) z
HttpCompletionOption.ResponseHeadersRead
.Pętla deserializacji powinna również sprawdzić token anulowania i wyjść lub rzucić, jeśli jest sygnalizowany. W przeciwnym razie pętla będzie działać, dopóki cały strumień nie zostanie odebrany i przetworzony.
Ten kod jest oparty na przykładzie pokrewnej odpowiedzi i wykorzystuje
HttpCompletionOption.ResponseHeadersRead
i sprawdza token anulowania. Może analizować ciągi JSON, które zawierają odpowiednią tablicę elementów, np .:Pierwsze wywołanie
jsonStreamReader.Read()
przesuwa się na początek tablicy, a drugie przesuwa się na początek pierwszego obiektu. Sama pętla kończy się po]
wykryciu końca tablicy ( ).Fragmenty JSON, AKA streaming JSON aka ... *
Dość często zdarza się, że w scenariuszach przesyłania strumieniowego lub rejestrowania zdarzeń poszczególne obiekty JSON są dołączane do pliku, po jednym elemencie w wierszu, np .:
To nie jest prawidłowy dokument JSON, ale poszczególne fragmenty są prawidłowe. Ma to kilka zalet w przypadku dużych zbiorów danych / wysoce współbieżnych scenariuszy. Dodanie nowego zdarzenia wymaga jedynie dodania nowego wiersza do pliku, a nie analizowania i przebudowywania całego pliku. Przetwarzanie , zwłaszcza przetwarzanie równoległe, jest łatwiejsze z dwóch powodów:
Korzystanie z StreamReadera
Aby to zrobić, należy użyć TextReadera, czytać jeden wiersz na raz i parsować go za pomocą JsonSerializer.Deserialize :
Jest to o wiele prostsze niż kod, który deserializuje odpowiednią tablicę. Istnieją dwa problemy:
ReadLineAsync
nie akceptuje tokenu anulowaniaMoże to wystarczyć, ponieważ próba wytworzenia
ReadOnlySpan<Byte>
buforów wymaganych przez JsonSerializer.Deserialize nie jest trywialna.Rurociągi i czytnik sekwencji
Aby uniknąć alokacji, musimy pobrać
ReadOnlySpan<byte>
ze strumienia. Wykonanie tego wymaga użycia potoków System.IO.Pipeline i struktury SequenceReader . Steve Gordon's An Introduction to SequenceReader wyjaśnia, w jaki sposób można wykorzystać tę klasę do odczytu danych ze strumienia przy użyciu ograniczników.Niestety
SequenceReader
jest strukturą referencyjną, co oznacza, że nie można jej używać w metodach asynchronicznych ani lokalnych. Dlatego Steve Gordon w swoim artykule tworzymetoda odczytu elementów z ReadOnlySequence i zwrócenie pozycji końcowej, aby PipeReader mógł z niej wznowić. Niestety chcemy zwrócić IEnumerable lub IAsyncEnumerable, a metody iteratora nie lubią parametrów ani
in
teżout
parametrów.Możemy zebrać deserializowane elementy z Listy lub Kolejki i zwrócić je jako pojedynczy wynik, ale to nadal przydzieli listy, bufory lub węzły i będziemy musieli poczekać, aż wszystkie elementy w buforze zostaną odserializowane przed zwróceniem:
Potrzebujemy czegoś , co zachowuje się jak wyliczenie, nie wymagając metody iteratora, działa z asynchronizacją i nie buforuje wszystkiego.
Dodawanie kanałów w celu utworzenia IAsyncEnumerable
ChannelReader.ReadAllAsync zwraca IAsyncEnumerable. Możemy zwrócić ChannelReader z metod, które nie mogłyby działać jako iteratory i nadal generować strumień elementów bez buforowania.
Dostosowując kod Steve'a Gordona do korzystania z kanałów, otrzymujemy ReadItems (ChannelWriter ...) i
ReadLastItem
metody. Pierwszy z nich odczytuje jeden element na raz, aż do nowej linii za pomocąReadOnlySpan<byte> itemBytes
. Może to być wykorzystane przezJsonSerializer.Deserialize
. JeśliReadItems
nie może znaleźć separatora, zwraca swoją pozycję, aby PipelineReader mógł pobrać następny fragment ze strumienia.Kiedy dotrzemy do ostatniego fragmentu i nie będzie już innego ogranicznika, ReadLastItem` odczytuje pozostałe bajty i deserializuje je.
Kod jest prawie identyczny z kodem Steve'a Gordona. Zamiast pisać do konsoli piszemy do ChannelWriter.
DeserializeToChannel<T>
Metoda stwarza czytelnikowi rurociągu na górze strumienia, tworzy kanał i rozpoczyna zadanie pracownika, który analizuje kawałki i popycha je do kanału:ChannelReader.ReceiveAllAsync()
może być wykorzystany do zużycia wszystkich przedmiotów poprzezIAsyncEnumerable<T>
:źródło
Wydaje się, że musisz zaszczepić własny czytnik strumieniowy. Musisz czytać bajty jeden po drugim i zatrzymać się, gdy tylko zakończy się definicja obiektu. To jest naprawdę dość niski poziom. Jako taki NIE BĘDZIESZ ładować całego pliku do pamięci RAM, ale raczej weź udział, z którym masz do czynienia. Czy to wydaje się być odpowiedzią?
źródło
Może mógłbyś użyć
Newtonsoft.Json
serializatora? https://www.newtonsoft.com/json/help/html/Performance.htmSzczególnie patrz sekcja:
Edytować
Możesz spróbować deserializować wartości z JsonTextReader, np
źródło
I don't want them to be in memory all at once, but I would rather read and process them one by one.
Odpowiednią klasą w JSON.NET jest JsonTextReader.