Asynchronicznie deserializowanie listy za pomocą System.Text.Json

11

Powiedzmy, że żądam dużego pliku json, który zawiera listę wielu obiektów. Nie chcę, aby były w pamięci od razu, ale wolałbym je czytać i przetwarzać jeden po drugim. Więc muszę przekształcić System.IO.Streamstrumień asynchroniczny w plik IAsyncEnumerable<T>. Jak do tego użyć nowego System.Text.Jsoninterfejsu API?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
Rick de Water
źródło
1
Prawdopodobnie będziesz potrzebować czegoś takiego jak metoda DeserializeAsync
Pavel Anikhouski
2
Niestety wygląda na to, że powyższa metoda ładuje cały strumień do pamięci. Można odczytać danych, których autorem jest kawałkami asynchonously korzystających Utf8JsonReader, proszę spojrzeć na niektóre GitHub próbkach i na istniejącym wątku , a także
Pavel Anikhouski
GetAsyncsam powraca po otrzymaniu całej odpowiedzi. Musisz SendAsynczamiast tego używać z `HttpCompletionOption.ResponseContentRead`. Gdy już to zrobisz, możesz użyć JsonTextReadera JSON.NET . Korzystanie System.Text.Jsonz tego nie jest tak łatwe, jak pokazuje ten problem . Ta funkcjonalność nie jest dostępna, a wdrożenie jej w niskim przydziale za pomocą struktur nie jest banalne
Panagiotis Kanavos
Problem z deserializacją w porcjach polega na tym, że musisz wiedzieć, kiedy masz całą porcję do deserializacji. Trudno byłoby to osiągnąć wyłącznie w ogólnych przypadkach. Wymagałoby to wcześniejszej analizy, co może być dość słabym kompromisem pod względem wydajności. Raczej trudno byłoby uogólnić. Ale jeśli narzucisz swoje własne ograniczenia na JSON, powiedz „pojedynczy obiekt zajmuje dokładnie 20 linii w pliku”, wtedy możesz zasadniczo zdezrializować asynchronicznie, czytając plik asynchronicznie. Wyobrażam sobie, że potrzebowałbyś ogromnego Jsona, aby zobaczyć korzyści tutaj.
DetectivePikachu,
Wygląda na to, że ktoś już odpowiedział na podobne pytanie tutaj z pełnym kodem.
Panagiotis Kanavos

Odpowiedzi:

4

Tak, prawdziwie strumieniujący (de) serializator JSON byłby fajną poprawą wydajności w tak wielu miejscach.

Niestety System.Text.Jsonobecnie tego nie robi. Nie jestem pewien, czy tak będzie w przyszłości - mam taką nadzieję! Naprawdę strumieniowanie deserializacji JSONa okazuje się być dość trudne.

Być może mógłbyś sprawdzić, czy wyjątkowo szybki Utf8Json go obsługuje.

Jednak może istnieć niestandardowe rozwiązanie dla konkretnej sytuacji, ponieważ wymagania wydają się ograniczać trudność.

Chodzi o to, aby ręcznie odczytać jeden element z tablicy na raz. Korzystamy z faktu, że każdy element na liście sam w sobie jest prawidłowym obiektem JSON.

Możesz ręcznie pominąć [(dla pierwszego elementu) lub ,(dla każdego następnego elementu). Zatem myślę, że najlepszym rozwiązaniem jest użycie platformy .NET Core Utf8JsonReaderdo ustalenia, gdzie kończy się bieżący obiekt, i przesłanie zeskanowanych bajtów JsonDeserializer.

W ten sposób buforujesz tylko nieznacznie jeden obiekt naraz.

A ponieważ mówimy o wydajności, możesz uzyskać dane wejściowe z PipeReader, gdy jesteś przy tym. :-)

Timo
źródło
W ogóle nie chodzi o wydajność. Tu nie chodzi o asynchroniczną deserializację, co już robi. Chodzi o dostęp do przesyłania strumieniowego - przetwarzanie elementów JSON podczas ich parsowania ze strumienia, tak jak robi to JsonTextReader JSON.NET.
Panagiotis Kanavos
Odpowiednią klasą w Utf8Json jest JsonReader, a jak mówi autor, to dziwne. JSON.NET's JsonTextReader i System.Text.Json's Utf8JsonReader mają tę samą dziwność - musisz zapętlać i sprawdzać typ bieżącego elementu podczas pracy.
Panagiotis Kanavos
@PanagiotisKanavos Ach, tak, streaming. Tego słowa szukałem! Aktualizuję słowo „asynchroniczny” na „streaming”. Uważam, że powodem, dla którego chce się przesyłania strumieniowego, jest ograniczenie wykorzystania pamięci, co stanowi problem z wydajnością. Być może OP może potwierdzić.
Timo,
Wydajność nie oznacza szybkości. Bez względu na to, jak szybki jest deserializator, jeśli musisz przetworzyć 1 mln elementów, nie chcesz przechowywać ich w pamięci RAM, ani nie czekaj, aż wszystkie z nich ulegną deserializacji, zanim będziesz mógł przetworzyć pierwszy.
Panagiotis Kanavos
Semantyka, mój przyjacielu! Cieszę się, że mimo wszystko staramy się osiągnąć to samo.
Timo
4

TL; DR To nie jest banalne


Wygląda jak ktoś już pisał pełny kod dla Utf8JsonStreamReaderstruktury, które odczytuje bufory ze strumienia i przekazuje je do Utf8JsonRreader, umożliwiając łatwy deserializacji z JsonSerializer.Deserialize<T>(ref newJsonReader, options);. Kod też nie jest trywialny. Powiązane pytanie jest tutaj, a odpowiedź jest tutaj .

To jednak nie wystarczy - HttpClient.GetAsyncpowróci dopiero po otrzymaniu całej odpowiedzi, zasadniczo buforując wszystko w pamięci.

Aby tego uniknąć, należy używać HttpClient.GetAsync (ciąg, HttpCompletionOption) z HttpCompletionOption.ResponseHeadersRead.

Pętla deserializacji powinna również sprawdzić token anulowania i wyjść lub rzucić, jeśli jest sygnalizowany. W przeciwnym razie pętla będzie działać, dopóki cały strumień nie zostanie odebrany i przetworzony.

Ten kod jest oparty na przykładzie pokrewnej odpowiedzi i wykorzystuje HttpCompletionOption.ResponseHeadersReadi sprawdza token anulowania. Może analizować ciągi JSON, które zawierają odpowiednią tablicę elementów, np .:

[{"prop1":123},{"prop1":234}]

Pierwsze wywołanie jsonStreamReader.Read()przesuwa się na początek tablicy, a drugie przesuwa się na początek pierwszego obiektu. Sama pętla kończy się po ]wykryciu końca tablicy ( ).

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

Fragmenty JSON, AKA streaming JSON aka ... *

Dość często zdarza się, że w scenariuszach przesyłania strumieniowego lub rejestrowania zdarzeń poszczególne obiekty JSON są dołączane do pliku, po jednym elemencie w wierszu, np .:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

To nie jest prawidłowy dokument JSON, ale poszczególne fragmenty są prawidłowe. Ma to kilka zalet w przypadku dużych zbiorów danych / wysoce współbieżnych scenariuszy. Dodanie nowego zdarzenia wymaga jedynie dodania nowego wiersza do pliku, a nie analizowania i przebudowywania całego pliku. Przetwarzanie , zwłaszcza przetwarzanie równoległe, jest łatwiejsze z dwóch powodów:

  • Poszczególne elementy można pobierać pojedynczo, po prostu czytając jedną linię ze strumienia.
  • Plik wejściowy można łatwo podzielić na partycje i podzielić na granice linii, podając każdą część osobnemu procesowi roboczemu, np. W klastrze Hadoop, lub po prostu innym wątkom w aplikacji: Oblicz punkty podziału, np. Dzieląc długość przez liczbę pracowników , a następnie poszukaj pierwszej nowej linii. Podaj wszystko do tego momentu osobnemu pracownikowi.

Korzystanie z StreamReadera

Aby to zrobić, należy użyć TextReadera, czytać jeden wiersz na raz i parsować go za pomocą JsonSerializer.Deserialize :

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

Jest to o wiele prostsze niż kod, który deserializuje odpowiednią tablicę. Istnieją dwa problemy:

  • ReadLineAsync nie akceptuje tokenu anulowania
  • Każda iteracja przydziela nowy ciąg, jedną z rzeczy, których chcieliśmy uniknąć , używając System.Text.Json

Może to wystarczyć, ponieważ próba wytworzenia ReadOnlySpan<Byte>buforów wymaganych przez JsonSerializer.Deserialize nie jest trywialna.

Rurociągi i czytnik sekwencji

Aby uniknąć alokacji, musimy pobrać ReadOnlySpan<byte>ze strumienia. Wykonanie tego wymaga użycia potoków System.IO.Pipeline i struktury SequenceReader . Steve Gordon's An Introduction to SequenceReader wyjaśnia, w jaki sposób można wykorzystać tę klasę do odczytu danych ze strumienia przy użyciu ograniczników.

Niestety SequenceReaderjest strukturą referencyjną, co oznacza, że ​​nie można jej używać w metodach asynchronicznych ani lokalnych. Dlatego Steve Gordon w swoim artykule tworzy

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

metoda odczytu elementów z ReadOnlySequence i zwrócenie pozycji końcowej, aby PipeReader mógł z niej wznowić. Niestety chcemy zwrócić IEnumerable lub IAsyncEnumerable, a metody iteratora nie lubią parametrów ani inteż outparametrów.

Możemy zebrać deserializowane elementy z Listy lub Kolejki i zwrócić je jako pojedynczy wynik, ale to nadal przydzieli listy, bufory lub węzły i będziemy musieli poczekać, aż wszystkie elementy w buforze zostaną odserializowane przed zwróceniem:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Potrzebujemy czegoś , co zachowuje się jak wyliczenie, nie wymagając metody iteratora, działa z asynchronizacją i nie buforuje wszystkiego.

Dodawanie kanałów w celu utworzenia IAsyncEnumerable

ChannelReader.ReadAllAsync zwraca IAsyncEnumerable. Możemy zwrócić ChannelReader z metod, które nie mogłyby działać jako iteratory i nadal generować strumień elementów bez buforowania.

Dostosowując kod Steve'a Gordona do korzystania z kanałów, otrzymujemy ReadItems (ChannelWriter ...) i ReadLastItemmetody. Pierwszy z nich odczytuje jeden element na raz, aż do nowej linii za pomocą ReadOnlySpan<byte> itemBytes. Może to być wykorzystane przez JsonSerializer.Deserialize. Jeśli ReadItemsnie może znaleźć separatora, zwraca swoją pozycję, aby PipelineReader mógł pobrać następny fragment ze strumienia.

Kiedy dotrzemy do ostatniego fragmentu i nie będzie już innego ogranicznika, ReadLastItem` odczytuje pozostałe bajty i deserializuje je.

Kod jest prawie identyczny z kodem Steve'a Gordona. Zamiast pisać do konsoli piszemy do ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

DeserializeToChannel<T>Metoda stwarza czytelnikowi rurociągu na górze strumienia, tworzy kanał i rozpoczyna zadanie pracownika, który analizuje kawałki i popycha je do kanału:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()może być wykorzystany do zużycia wszystkich przedmiotów poprzez IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
Panagiotis Kanavos
źródło
0

Wydaje się, że musisz zaszczepić własny czytnik strumieniowy. Musisz czytać bajty jeden po drugim i zatrzymać się, gdy tylko zakończy się definicja obiektu. To jest naprawdę dość niski poziom. Jako taki NIE BĘDZIESZ ładować całego pliku do pamięci RAM, ale raczej weź udział, z którym masz do czynienia. Czy to wydaje się być odpowiedzią?

Sereja Bogolubov
źródło
-2

Może mógłbyś użyć Newtonsoft.Jsonserializatora? https://www.newtonsoft.com/json/help/html/Performance.htm

Szczególnie patrz sekcja:

Zoptymalizuj wykorzystanie pamięci

Edytować

Możesz spróbować deserializować wartości z JsonTextReader, np

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
Miłosz Wieczorek
źródło
To nie odpowiada na pytanie. W ogóle nie chodzi o wydajność, chodzi o dostęp do przesyłania strumieniowego bez ładowania wszystkiego do pamięci
Panagiotis Kanavos
Czy otworzyłeś powiązany link lub po prostu powiedziałeś, co myślisz? W linku, który wysłałem w sekcji, o której wspomniałem, jest fragment kodu, w jaki sposób dokonać deserializacji JSON ze strumienia.
Miłosz Wieczorek
Przeczytaj jeszcze raz pytanie - OP pyta, jak przetwarzać elementy bez deserializacji wszystkiego w pamięci. Nie tylko czytaj ze strumienia, ale przetwarzaj tylko to, co pochodzi ze strumienia. I don't want them to be in memory all at once, but I would rather read and process them one by one.Odpowiednią klasą w JSON.NET jest JsonTextReader.
Panagiotis Kanavos
W każdym razie odpowiedź tylko do linku nie jest uważana za dobrą odpowiedź i nic w tym linku nie odpowiada na pytanie PO. Link do JsonTextReadera byłby lepszy
Panagiotis Kanavos