Obsługa błędów ze strumieniami node.js.

164

Jaki jest prawidłowy sposób obsługi błędów w strumieniach? Wiem już, że istnieje zdarzenie „błąd”, którego można słuchać, ale chcę poznać więcej szczegółów na temat dowolnie skomplikowanych sytuacji.

Na początek, co robisz, gdy chcesz zrobić prosty łańcuch rur:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

Jak poprawnie utworzyć jedną z tych transformacji, aby błędy były poprawnie obsługiwane?

Więcej powiązanych pytań:

  • kiedy wystąpi błąd, co dzieje się ze zdarzeniem „koniec”? Czy nigdy się nie strzela? Czy czasami się strzela? Czy to zależy od transformacji / strumienia? Jakie są tutaj standardy?
  • czy są jakieś mechanizmy rozsyłania błędów przez rury?
  • czy domeny skutecznie rozwiązują ten problem? Przykłady byłyby fajne.
  • czy błędy wynikające ze zdarzeń „error” mają ślady stosu? Czasami? Nigdy? czy jest sposób, aby je od nich zdobyć?
BT
źródło
1
To nie jest trywialne. Promiseframeworki czynią to o wiele prostszym
slezica
27
Niestety obietnice / kontrakty futures nie mogą naprawdę pomóc w przypadku strumieni ...
BT,

Odpowiedzi:

222

przekształcać

Strumienie transformacji są zarówno czytelne, jak i zapisywalne, a zatem są naprawdę dobrymi strumieniami „środkowymi”. Z tego powodu czasami nazywa się je throughstrumieniami. W ten sposób są podobne do strumienia dupleksowego, z wyjątkiem tego, że zapewniają przyjemny interfejs do manipulowania danymi, a nie tylko ich wysyłania. Celem strumienia transformacji jest manipulowanie danymi podczas przesyłania ich przez strumień. Możesz na przykład chcieć wykonać kilka wywołań asynchronicznych lub wyprowadzić kilka pól, zmienić mapowanie niektórych rzeczy itp.


Gdzie możesz umieścić strumień transformacji


Aby dowiedzieć się, jak utworzyć strumień transformacji, zobacz tutaj i tutaj . Wszystko co musisz zrobić to :

  1. zawierać moduł strumienia
  2. Utwórz wystąpienie (lub dziedzicz z) klasy Transform
  3. zaimplementuj _transformmetodę, która przyjmuje plik (chunk, encoding, callback).

Kawałek to twoje dane. W większości przypadków nie musisz martwić się o kodowanie, jeśli pracujesz w programie objectMode = true. Wywołanie zwrotne jest wywoływane po zakończeniu przetwarzania porcji. Ten fragment jest następnie wypychany do następnego strumienia.

Jeśli potrzebujesz fajnego modułu pomocniczego, który pozwoli ci naprawdę łatwo przejść przez stream, proponuję przez through2 .

Aby uzyskać informacje na temat obsługi błędów, czytaj dalej.

rura

W łańcuchu potoków błędy obsługi są rzeczywiście nietrywialne. Zgodnie z tym wątkiem .pipe () nie jest zbudowany do przekazywania błędów. Więc coś w stylu ...

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

... nasłuchiwał tylko błędów w strumieniu c. Gdyby wyemitowano zdarzenie błędu a, nie zostałoby ono przekazane, a faktycznie zostałoby odrzucone. Aby zrobić to poprawnie:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

Teraz, chociaż drugi sposób jest bardziej szczegółowy, możesz przynajmniej zachować kontekst, w którym występują błędy. Zwykle jest to dobra rzecz.

Jedna biblioteka, którą uważam za pomocną, jeśli masz przypadek, w którym chcesz uchwycić tylko błędy w miejscu docelowym i nie obchodzi Cię tak bardzo, gdzie to się stało, to strumień zdarzeń .

koniec

Gdy zostanie wyzwolone zdarzenie błędu, zdarzenie końcowe nie zostanie uruchomione (jawnie). Emisja zdarzenia błędu zakończy transmisję.

domeny

Z mojego doświadczenia wynika, że ​​domeny działają bardzo dobrze przez większość czasu. Jeśli masz nieobsłużone zdarzenie błędu (np. Emitowanie błędu w strumieniu bez nasłuchiwania), serwer może ulec awarii. Teraz, jak wskazuje powyższy artykuł, możesz umieścić strumień w domenie, która powinna poprawnie wychwycić wszystkie błędy.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });

Piękno domen polega na tym, że zachowują ślady stosu. Chociaż strumień wydarzeń również dobrze sobie z tym radzi.

Aby uzyskać więcej informacji, zapoznaj się z podręcznikiem transmisji strumieniowej . Całkiem dogłębne, ale bardzo przydatne i zawiera świetne linki do wielu pomocnych modułów.

mshell_lauren
źródło
To naprawdę świetne informacje, dzięki! Czy mógłbyś dodać trochę o tym, dlaczego chcesz utworzyć strumień transformacji i dlaczego odnosi się to do mojego pytania?
BT
Jasne - chociaż pomyślałem, że to związane, odkąd o to zapytałeś; )
mshell_lauren
1
Opublikuj na ten temat przez isaccs na Grupach dyskusyjnych Google - nodejs: groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ (nie grokbase)
jpillora
Ta odpowiedź jest napisana doskonale. Zbadam sugestię domeny - wydaje się, że jest to rozwiązanie, którego szukałem.
Średnik
12
Zauważ, że nie musisz .on('error')a.on('error', function(e){handleError(e)})a.on('error', handleError)
opakowywać
28

Jeśli używasz node> = v10.0.0, możesz użyć stream.pipeline i stream.finished .

Na przykład:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

Więcej dyskusji znajdziesz w tym PR na githubie.

Shusson
źródło
1
Dlaczego miałbyś używać finished, skoro pipelinejuż masz oddzwonienie?
Marcos Pereira
4
Możesz chcieć inaczej obsługiwać błędy między potokiem a poszczególnymi strumieniami.
shusson,
25

domeny są przestarzałe. nie potrzebujesz ich.

w przypadku tego pytania rozróżnienie między przekształcaniem a zapisywalnością nie jest tak ważne.

Odpowiedź mshell_lauren jest świetna, ale alternatywnie możesz również wyraźnie nasłuchiwać zdarzenia błędu w każdym strumieniu, który Twoim zdaniem może powodować błąd. i jeśli wolisz, użyj ponownie funkcji obsługi.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

Dzięki temu unikniesz niesławnego, nieprzechwyconego wyjątku, jeśli jeden z tych strumieni uruchomi zdarzenie błędu

Bent Cardan
źródło
3
lol mieć obsługi 3 różne zdarzenia błędów i modlić się bawić, że ktokolwiek napisał 3 różne libs strumieniowej prawidłowo wdrożony błąd obsługi
Alexander Mills
4
@Alex Mills 1) Na czym polega problem obsługi 3 zdarzeń i dlaczego są one „różne”, skoro ich typ jest taki sam - errorrównie dobrze można wtedy poprzestać na tym, że każde zdarzenie jest odrębne; 2) Jakie biblioteki strumieniowe są napisane powyżej, poza natywną funkcjonalnością Node.js? i 3) dlaczego ma znaczenie, w jaki sposób obsługują zdarzenia wewnętrznie, skoro to oczywiście pozwala każdemu na dołączenie dodatkowych modułów obsługi błędów do tego, co już istnieje?
amn
10

Błędy z całego łańcucha można propagować do strumienia znajdującego się najbardziej po prawej stronie za pomocą prostej funkcji:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

które mogą być używane jak:

safePipe(readable, [ transform1, transform2, ... ]);
Gleba
źródło
5

.on("error", handler)zajmuje się tylko błędami Stream, ale jeśli używasz niestandardowych strumieni Transform, .on("error", handler)nie wychwytuj błędów występujących w _transformfunkcji. Więc można zrobić coś takiego, aby kontrolować przepływ aplikacji: -

thissłowo kluczowe w _transformfunkcji odnosi się do Streamsamego siebie, czyli pliku EventEmitter. Możesz więc użyć try catchjak poniżej, aby wyłapać błędy, a następnie przekazać je do niestandardowych programów obsługi zdarzeń.

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

W ten sposób możesz oddzielić logikę i programy obsługi błędów. Możesz też zdecydować się na obsługę tylko niektórych błędów, a ignorowanie innych.

UPDATE
Alternative: RXJS Observable

Vikas Gautam
źródło
4

Użyj pakietu multipipe , aby połączyć kilka strumieni w jeden strumień dupleksowy. I obsługuj błędy w jednym miejscu.

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)
Sergey Savenko
źródło
1

Użyj wzorca Node.js, tworząc mechanikę strumienia Transform i wywołując jej wywołanie zwrotne donez argumentem, aby propagować błąd:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });
Derek
źródło
Hmm, więc mówisz, że gdyby wszystkie procesory strumieniowe były zbudowane w ten sposób, błędy byłyby propagowane?
BT,
-2

Funkcja Try catch nie przechwyci błędów, które wystąpiły w strumieniu, ponieważ są one zgłaszane po zakończeniu działania kodu wywołującego. możesz zapoznać się z dokumentacją:

https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

Mehran
źródło
Dzięki, ale to wcale nie odpowiada na pytanie.
BT
Podanie mi 40-stronicowego dokumentu nie jest pomocne. Jak myślisz, do czego powinienem się odwołać na tej gigantycznej stronie? Czy przeczytałeś też moje pytanie? Moje pytanie nie brzmi: „czy próba łapania pracy ze strumieniami?” Wiem już dobrze, że try-catch nie zadziała z błędami asynchronicznymi, np. Z potokami przetwarzania strumieniowego.
BT,