Przeanalizuj duży plik JSON w Nodejs

98

Mam plik, który przechowuje wiele obiektów JavaScript w postaci JSON i muszę go przeczytać, utworzyć każdy z obiektów i coś z nimi zrobić (w moim przypadku wstawić je do bazy danych). Obiekty JavaScript można przedstawić w formacie:

Format A:

[{name: 'thing1'},
....
{name: 'thing999999999'}]

lub Format B:

{name: 'thing1'}         // <== My choice.
...
{name: 'thing999999999'}

Zwróć uwagę, że ...symbol oznacza wiele obiektów JSON. Zdaję sobie sprawę, że mógłbym wczytać cały plik do pamięci, a następnie użyć w JSON.parse()ten sposób:

fs.readFile(filePath, 'utf-8', function (err, fileContents) {
  if (err) throw err;
  console.log(JSON.parse(fileContents));
});

Jednak plik może być naprawdę duży, wolałbym użyć do tego strumienia. Problem, który widzę w przypadku strumienia, polega na tym, że zawartość pliku może zostać podzielona na fragmenty danych w dowolnym momencie, więc jak mogę używać JSON.parse()na takich obiektach?

Idealnie byłoby, gdyby każdy obiekt był odczytywany jako oddzielny fragment danych, ale nie jestem pewien, jak to zrobić .

var importStream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
importStream.on('data', function(chunk) {

    var pleaseBeAJSObject = JSON.parse(chunk);           
    // insert pleaseBeAJSObject in a database
});
importStream.on('end', function(item) {
   console.log("Woot, imported objects into the database!");
});*/

Uwaga, chciałbym zapobiec wczytywaniu całego pliku do pamięci. Efektywność czasowa nie ma dla mnie znaczenia. Tak, mógłbym spróbować odczytać wiele obiektów na raz i wstawić je wszystkie na raz, ale to jest poprawka wydajności - potrzebuję sposobu, który gwarantuje, że nie spowoduje przeciążenia pamięci, bez względu na to, ile obiektów jest zawartych w pliku .

Mogę użyć, FormatAa FormatBmoże czegoś innego, po prostu określ w odpowiedzi. Dzięki!

dgh
źródło
W przypadku formatu B możesz przeanalizować fragment w poszukiwaniu nowych wierszy i wyodrębnić każdą całą linię, łącząc resztę, jeśli odcina się w środku. Może być jednak bardziej elegancki sposób. Nie pracowałem zbyt wiele ze strumieniami.
travis

Odpowiedzi:

82

Aby przetworzyć plik wiersz po wierszu, wystarczy oddzielić odczyt pliku od kodu, który działa na tym wejściu. Możesz to osiągnąć, buforując dane wejściowe, dopóki nie trafisz w nową linię. Zakładając, że mamy jeden obiekt JSON w linii (w zasadzie format B):

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var buf = '';

stream.on('data', function(d) {
    buf += d.toString(); // when data is read, stash it in a string buffer
    pump(); // then process the buffer
});

function pump() {
    var pos;

    while ((pos = buf.indexOf('\n')) >= 0) { // keep going while there's a newline somewhere in the buffer
        if (pos == 0) { // if there's more than one newline in a row, the buffer will now start with a newline
            buf = buf.slice(1); // discard it
            continue; // so that the next iteration will start with data
        }
        processLine(buf.slice(0,pos)); // hand off the line
        buf = buf.slice(pos+1); // and slice the processed data off the buffer
    }
}

function processLine(line) { // here's where we do something with a line

    if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D)

    if (line.length > 0) { // ignore empty lines
        var obj = JSON.parse(line); // parse the JSON
        console.log(obj); // do something with the data here!
    }
}

Za każdym razem, gdy strumień pliku odbiera dane z systemu plików, są one przechowywane w buforze, a następnie pumpwywoływane.

Jeśli w buforze nie ma nowej linii, pumppo prostu wraca bez robienia czegokolwiek. Więcej danych (i potencjalnie nowa linia) zostanie dodanych do bufora następnym razem, gdy strumień otrzyma dane, a następnie będziemy mieć kompletny obiekt.

Jeśli jest nowa linia, pumpodcina bufor od początku do nowej linii i przekazuje go process. Następnie ponownie sprawdza, czy w buforze ( whilepętli) znajduje się kolejna nowa linia . W ten sposób możemy przetworzyć wszystkie wiersze, które zostały odczytane w bieżącej porcji.

Wreszcie processjest wywoływana raz na linię wejściową. Jeśli występuje, usuwa znak powrotu karetki (aby uniknąć problemów z zakończeniami linii - LF vs CRLF), a następnie wywołuje JSON.parsejeden z nich. W tym momencie możesz zrobić ze swoim obiektem wszystko, czego potrzebujesz.

Zauważ, że JSON.parsejest ściśle określony, co akceptuje jako dane wejściowe; musisz cytować swoje identyfikatory i wartości ciągów w podwójnych cudzysłowach . Innymi słowy, {name:'thing1'}zgłosi błąd; musisz użyć {"name":"thing1"}.

Ponieważ jednorazowo w pamięci nigdy nie będzie więcej niż porcja danych, będzie to niezwykle wydajne pod względem pamięci. Będzie też niezwykle szybki. Szybki test wykazał, że przetworzyłem 10000 wierszy w czasie poniżej 15 ms.

josh3736
źródło
12
Ta odpowiedź jest teraz zbędna. Użyj JSONStream i masz wsparcie po wyjęciu z pudełka.
arcseldon
2
Nazwa funkcji „proces” jest zła. „proces” powinien być zmienną systemową. Ten błąd mylił mnie przez wiele godzin.
Zhigong Li
17
@arcseldon Nie sądzę, że fakt, że istnieje biblioteka, która to robi, sprawia, że ​​ta odpowiedź jest zbędna. Z pewnością warto wiedzieć, jak można to zrobić bez modułu.
Kevin B
3
Nie jestem pewien, czy to zadziała w przypadku zminimalizowanego pliku JSON. Co by się stało, gdyby cały plik był zawinięty w jedną linię i użycie takich separatorów nie było możliwe? Jak więc rozwiążemy ten problem?
SLearner
7
Biblioteki innych firm nie są stworzone z magii, o której wiesz. Są jak ta odpowiedź, rozbudowanymi wersjami ręcznie wyrabianych rozwiązań, ale po prostu spakowanymi i oznaczonymi jako program. Zrozumienie, jak działają rzeczy, jest znacznie ważniejsze i bardziej istotne niż ślepe wrzucanie danych do biblioteki, oczekując wyników. Tylko mówię :)
zanona
34

Tak jak myślałem, że fajnie byłoby napisać strumieniowy parser JSON, pomyślałem również, że może powinienem przeprowadzić szybkie wyszukiwanie, aby sprawdzić, czy jest już dostępny.

Okazuje się, że jest.

  • JSONStream „przesyłanie strumieniowe JSON.parse i stringify”

Odkąd go właśnie znalazłem, oczywiście go nie używałem, więc nie mogę komentować jego jakości, ale będę zainteresowany, czy to działa.

To działa, biorąc pod uwagę następujący JavaScript i _.isString:

stream.pipe(JSONStream.parse('*'))
  .on('data', (d) => {
    console.log(typeof d);
    console.log("isString: " + _.isString(d))
  });

Spowoduje to rejestrowanie obiektów w momencie ich wejścia, jeśli strumień jest tablicą obiektów. Dlatego jedyną buforowaną rzeczą jest jeden obiekt na raz.

użytkownik1106925
źródło
29

Od października 2014 r. Możesz po prostu zrobić coś takiego (używając JSONStream) - https://www.npmjs.org/package/JSONStream

var fs = require('fs'),
    JSONStream = require('JSONStream'),

var getStream() = function () {
    var jsonData = 'myData.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
}

getStream().pipe(MyTransformToDoWhateverProcessingAsNeeded).on('error', function (err) {
    // handle any errors
});

Aby zademonstrować na praktycznym przykładzie:

npm install JSONStream event-stream

data.json:

{
  "greeting": "hello world"
}

hello.js:

var fs = require('fs'),
    JSONStream = require('JSONStream'),
    es = require('event-stream');

var getStream = function () {
    var jsonData = 'data.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
};

getStream()
    .pipe(es.mapSync(function (data) {
        console.log(data);
    }));
$ node hello.js
// hello world
arcseldon
źródło
2
Jest to głównie prawda i przydatne, ale myślę, że musisz to zrobić parse('*')lub nie otrzymasz żadnych danych.
John Zwinck
@JohnZwinck Dziękuję, zaktualizowałem odpowiedź i dodałem działający przykład, aby w pełni to zademonstrować.
arcseldon
w pierwszym bloku kodu var getStream() = function () {należy usunąć pierwszy zestaw nawiasów .
givemesnacks
1
To nie powiodło się z powodu błędu braku pamięci z plikiem json 500 MB.
Keith John Hutchison
18

Zdaję sobie sprawę, że jeśli to możliwe, chcesz uniknąć wczytywania całego pliku JSON do pamięci, jednak jeśli masz dostępną pamięć, może to nie być zły pomysł pod względem wydajności. Użycie funkcji require () node.js w pliku json ładuje dane do pamięci bardzo szybko.

Przeprowadziłem dwa testy, aby zobaczyć, jak wygląda wydajność podczas drukowania atrybutu z każdej funkcji z 81 MB pliku geojson.

W pierwszym teście wczytałem do pamięci cały plik geojson za pomocą var data = require('./geo.json') . Zajęło to 3330 milisekund, a wydrukowanie atrybutu z każdej funkcji zajęło 804 milisekund, co daje łącznie 4134 milisekund. Okazało się jednak, że node.js zużywa 411 MB pamięci.

W drugim teście użyłem odpowiedzi @ arcseldon z JSONStream + strumień zdarzeń. Zmodyfikowałem zapytanie JSONPath, aby wybrać tylko to, czego potrzebowałem. Tym razem pamięć nigdy nie przekroczyła 82 MB, jednak teraz całość zajęła 70 sekund!

Evan Siroky
źródło
18

Miałem podobny wymóg, muszę odczytać duży plik json w węźle js i przetwarzać dane w kawałkach i wywołać interfejs API i zapisać w mongodb. inputFile.json wygląda tak:

{
 "customers":[
       { /*customer data*/},
       { /*customer data*/},
       { /*customer data*/}....
      ]
}

Teraz użyłem JsonStream i EventStream, aby osiągnąć to synchronicznie.

var JSONStream = require("JSONStream");
var es = require("event-stream");

fileStream = fs.createReadStream(filePath, { encoding: "utf8" });
fileStream.pipe(JSONStream.parse("customers.*")).pipe(
  es.through(function(data) {
    console.log("printing one customer object read from file ::");
    console.log(data);
    this.pause();
    processOneCustomer(data, this);
    return data;
  }),
  function end() {
    console.log("stream reading ended");
    this.emit("end");
  }
);

function processOneCustomer(data, es) {
  DataModel.save(function(err, dataModel) {
    es.resume();
  });
}
karthick N
źródło
Dziękuję bardzo za dodanie odpowiedzi, moja sprawa również wymagała synchronicznej obsługi. Jednak po przetestowaniu nie było możliwe wywołanie „end ()” jako wywołania zwrotnego po zakończeniu potoku. Uważam, że jedyną rzeczą, którą można zrobić, jest dodanie zdarzenia, co powinno się wydarzyć po „zakończeniu” / „zamknięciu” strumienia za pomocą „plikuStream.on („ close ”, ...) ´.
nonNumericalFloat
6

Napisałem moduł, który to potrafi, nazwany BFJ . W szczególności metody bfj.matchmożna użyć do podzielenia dużego strumienia na oddzielne fragmenty JSON:

const bfj = require('bfj');
const fs = require('fs');

const stream = fs.createReadStream(filePath);

bfj.match(stream, (key, value, depth) => depth === 0, { ndjson: true })
  .on('data', object => {
    // do whatever you need to do with object
  })
  .on('dataError', error => {
    // a syntax error was found in the JSON
  })
  .on('error', error => {
    // some kind of operational error occurred
  })
  .on('end', error => {
    // finished processing the stream
  });

Tutaj bfj.matchzwraca czytelny strumień w trybie obiektowym, który otrzyma przeanalizowane elementy danych i otrzyma 3 argumenty:

  1. Czytelny strumień zawierający wejściowy kod JSON.

  2. Predykat wskazujący, które elementy z przeanalizowanego kodu JSON zostaną przekazane do strumienia wyników.

  3. Obiekt opcji wskazujący, że dane wejściowe to rozdzielany znakami nowego wiersza JSON (ma to przetworzyć format B z pytania, nie jest wymagany dla formatu A).

Po wywołaniu bfj.matchprzeanalizuje dane JSON ze strumienia wejściowego w pierwszej kolejności, wywołując predykat z każdą wartością, aby określić, czy wypchnąć ten element do strumienia wyników. Do predykatu przekazywane są trzy argumenty:

  1. Klucz właściwości lub indeks tablicy (będzie to undefineddla elementów najwyższego poziomu).

  2. Wartość sama w sobie.

  3. Głębokość elementu w strukturze JSON (zero dla elementów najwyższego poziomu).

Oczywiście w razie potrzeby można użyć bardziej złożonego predykatu, zgodnie z wymaganiami. Możesz również przekazać ciąg lub wyrażenie regularne zamiast funkcji predykatu, jeśli chcesz wykonać proste dopasowania względem kluczy właściwości.

Phil Booth
źródło
4

Rozwiązałem ten problem za pomocą modułu split npm . Przekieruj strumień na podzielony, a to „ rozbije strumień i złóż go ponownie, tak aby każda linia była kawałkiem ”.

Przykładowy kod:

var fs = require('fs')
  , split = require('split')
  ;

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var lineStream = stream.pipe(split());
linestream.on('data', function(chunk) {
    var json = JSON.parse(chunk);           
    // ...
});
Brian Leathem
źródło
4

Jeśli masz kontrolę nad plikiem wejściowym i jest to tablica obiektów, możesz łatwiej rozwiązać ten problem. Rozmieść, aby wyprowadzić plik z każdym rekordem w jednej linii, na przykład:

[
   {"key": value},
   {"key": value},
   ...

To jest nadal poprawny JSON.

Następnie użyj modułu readline node.js, aby przetworzyć je po jednej linii na raz.

var fs = require("fs");

var lineReader = require('readline').createInterface({
    input: fs.createReadStream("input.txt")
});

lineReader.on('line', function (line) {
    line = line.trim();

    if (line.charAt(line.length-1) === ',') {
        line = line.substr(0, line.length-1);
    }

    if (line.charAt(0) === '{') {
        processRecord(JSON.parse(line));
    }
});

function processRecord(record) {
    // Process the records one at a time here! 
}
Steve Hanov
źródło
-1

Myślę, że musisz skorzystać z bazy danych. MongoDB to dobry wybór w tym przypadku, ponieważ jest kompatybilny z JSON.

AKTUALIZACJA : Możesz użyć narzędzia mongoimport , aby zaimportować dane JSON do MongoDB.

mongoimport --collection collection --file collection.json
Vadim Baryshev
źródło
1
To nie odpowiada na pytanie. Zwróć uwagę, że druga linia pytania mówi, że chce to zrobić, aby pobrać dane do bazy danych .
josh3736
mongoimport importuje tylko plik o rozmiarze do 16 MB.
Haziq Ahmed