Jak tworzyć strumienie z łańcucha w Node.Js?

177

Używam biblioteki ya-csv , która oczekuje pliku lub strumienia jako danych wejściowych, ale mam ciąg.

Jak przekonwertować ten ciąg na strumień w Node?

pathikrit
źródło

Odpowiedzi:

27

W węźle 10.17 stream.Readable mają frommetodę łatwego tworzenia strumieni z dowolnej iterowalnej (która obejmuje literały tablicowe):

const { Readable } = require("stream")

const readable = Readable.from(["input string"])

readable.on("data", (chunk) => {
  console.log(chunk) // will be called once with `"input string"`
})

Zauważ, że przynajmniej między 10.17 a 12.3, łańcuch sam w sobie jest iterowalny, więc Readable.from("input string")będzie działał, ale emituje jedno zdarzenie na znak. Readable.from(["input string"])wyemituje jedno zdarzenie na pozycję w tablicy (w tym przypadku jeden element).

Zwróć również uwagę, że w późniejszych węzłach (prawdopodobnie 12.3, ponieważ dokumentacja mówi, że funkcja została wówczas zmieniona), nie jest już konieczne zawijanie łańcucha w tablicę.

https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options

Fizker
źródło
Według stream.Readable.from , Calling Readable.from (string) lub Readable.from (buffer) nie będzie
skrót
Mój błąd. Funkcja została dodana w 10.7 i zachowywała się tak, jak pierwotnie opisałem. Od czasu do czasu łańcuchy nie muszą już być zawijane w tablice (od 12.3 nie iteruje już każdego znaku indywidualnie).
Fizker
186

Ponieważ @substack poprawił mnie w #node , nowe API strumieni w Node v10 ułatwia to:

const Readable = require('stream').Readable;
const s = new Readable();
s._read = () => {}; // redundant? see update below
s.push('your text here');
s.push(null);

... po którym można swobodnie rura to lub w inny sposób przekazać go do zamierzonego odbiorcy.

Nie jest tak czysty, jak jednolinijkowy resumer , ale pozwala uniknąć dodatkowej zależności.

( Aktualizacja: do tej pory w wersjach od 10.10.26 do 9.2.1 wywołanie pushbezpośrednio z zachęty REPL ulegnie awarii z not implementedwyjątkiem, jeśli nie zostało to ustawione _read. Nie nastąpi awaria wewnątrz funkcji lub skryptu. Jeśli niespójność powoduje, że nerwowy, obejmują noop.)

Garth Kidd
źródło
6
Z dokumentacji (link) : „Wszystkie implementacje strumienia czytelnego muszą zapewniać _readmetodę pobierania danych z podstawowego zasobu”.
Felix Rabe
2
@eye_mew musisz najpierw wymagać ('stream')
Jim Jones,
8
Dlaczego wciskasz nullsię do bufora strumienia?
dopatraman
5
@dopatraman nullinformuje strumień, że zakończył odczytywanie wszystkich danych i zamknięcie strumienia
chrishiestand
2
Wygląda na to, że nie powinieneś tego robić w ten sposób. Cytując dokumenty : „ readable.push()Metoda ma być wywoływana tylko przez czytelne implementacje i tylko z poziomu readable._read()metody”.
Axel Rauschmayer
127

Nie używaj odpowiedzi resumeru Jo Liss. W większości przypadków zadziała, ale w moim przypadku straciłem dobre 4 lub 5 godzin na znajdowanie błędów. Nie ma do tego potrzeby stosowania modułów innych firm.

NOWA ODPOWIEDŹ :

var Readable = require('stream').Readable

var s = new Readable()
s.push('beep')    // the string you want
s.push(null)      // indicates end-of-file basically - the end of the stream

Powinien to być w pełni zgodny czytelny strumień. Zobacz tutaj, aby uzyskać więcej informacji na temat prawidłowego korzystania ze strumieni.

STARE ODPOWIEDŹ : Po prostu użyj natywnego strumienia PassThrough:

var stream = require("stream")
var a = new stream.PassThrough()
a.write("your string")
a.end()

a.pipe(process.stdout) // piping will work as normal
/*stream.on('data', function(x) {
   // using the 'data' event works too
   console.log('data '+x)
})*/
/*setTimeout(function() {
   // you can even pipe after the scheduler has had time to do other things
   a.pipe(process.stdout) 
},100)*/

a.on('end', function() {
    console.log('ended') // the end event will be called properly
})

Należy zauważyć, że zdarzenie „close” nie jest emitowane (co nie jest wymagane przez interfejsy strumieniowe).

BT
źródło
2
@Finn Nie potrzebujesz parens w javascript, jeśli nie ma żadnych argumentów
BT
nie używaj "var" w 2018 roku! ale const
stackdave
30

Po prostu utwórz nową instancję streammodułu i dostosuj ją do swoich potrzeb:

var Stream = require('stream');
var stream = new Stream();

stream.pipe = function(dest) {
  dest.write('your string');
  return dest;
};

stream.pipe(process.stdout); // in this case the terminal, change to ya-csv

lub

var Stream = require('stream');
var stream = new Stream();

stream.on('data', function(data) {
  process.stdout.write(data); // change process.stdout to ya-csv
});

stream.emit('data', 'this is my string');
zemirco
źródło
13
Ten kod łamie konwencje dotyczące strumieniowania. pipe()ma przynajmniej zwrócić strumień docelowy.
greim
2
Zdarzenie końcowe nie jest wywoływane, jeśli używasz tego kodu. To nie jest dobry sposób na tworzenie strumienia, którego można używać ogólnie.
BT,
12

Edycja: odpowiedź Gartha jest prawdopodobnie lepsza.

Mój stary tekst odpowiedzi został zachowany poniżej.


Aby przekonwertować ciąg do strumienia, można użyć wstrzymane przez strumień:

through().pause().queue('your string').end()

Przykład:

var through = require('through')

// Create a paused stream and buffer some data into it:
var stream = through().pause().queue('your string').end()

// Pass stream around:
callback(null, stream)

// Now that a consumer has attached, remember to resume the stream:
stream.resume()
Jo Liss
źródło
Nie mogłem zmusić rozwiązania zeMirco do pracy w moim przypadku użycia, ale resumerdziałało całkiem dobrze. Dzięki!
mpen
Sugestia @substack resumer działała bardzo dobrze. Dzięki!
Garth Kidd
2
Resumer jest świetny, ale „automatyczne wznawianie transmisji na nextTick” może przynieść niespodzianki, jeśli spodziewasz się, że możesz przekazać strumień nieznanym konsumentom! Miałem kod, który przesyłał strumień treści do pliku, jeśli zapis metadanych w bazie danych się powiódł. To był czai się błąd, zdarzyło się, że udało się, gdy zapis db natychmiast zwrócił sukces! Później przerobiłem rzeczy tak, aby znajdowały się w bloku asynchronicznym i bum, strumień nigdy nie był czytelny. Lekcja: jeśli nie wiesz, kto będzie konsumował Twój strumień, trzymaj się techniki through (). Pause (). Queue ('string'). End ().
Jolly Roger,
1
Spędziłem około 5 godzin na debugowaniu kodu, ponieważ użyłem części podsumowującej tej odpowiedzi. Byłoby wspaniale, gdybyś mógł ... usunąć to
BT,
10

Jest do tego moduł: https://www.npmjs.com/package/string-to-stream

var str = require('string-to-stream')
str('hi there').pipe(process.stdout) // => 'hi there' 
Lori
źródło
1
Czy to gra słów „jest do tego aplikacja”? ;)
masterxilo
1
Link w komentarzu jest przydatny: npmjs.com/package/string-to-stream
Dem Pilafian
FYI Próbowałem użyć tej biblioteki do pisania JSON na dysk Google, ale to nie zadziałało dla mnie. Napisałem artykuł na ten temat tutaj: medium.com/@dupski/… . Dodano również jako odpowiedź poniżej
Russell Briggs
6

w skrypcie kawowym:

class StringStream extends Readable
  constructor: (@str) ->
    super()

  _read: (size) ->
    @push @str
    @push null

Użyj tego:

new StringStream('text here').pipe(stream1).pipe(stream2)
xinthink
źródło
6

Innym rozwiązaniem jest przekazanie funkcji read do konstruktora Readable (patrz opcje readeable w strumieniu doc )

var s = new Readable({read(size) {
    this.push("your string here")
    this.push(null)
  }});

możesz po użyciu na przykład s.pipe

Philippe T.
źródło
jaki jest cel powrotu na koniec?
Kirill Reznikov
„zawsze zwracaj coś (lub nic)”, ten przykład z dokumentacji.
Philippe T.
W JS, jeśli funkcja nie ma zwrotu, jest to odpowiednik pustego zwrotu. Czy mógłbyś podać link tam, gdzie go znalazłeś?
Kirill Reznikov
powinieneś mieć rację. Powiedziałem to bardziej dla najlepszej praktyki. Nie chcę nic zwrócić, to nie pomyłka. Więc usuwam tę linię.
Philippe T.
5

Zmęczyło mnie konieczność ponownego uczenia się tego co sześć miesięcy, więc właśnie opublikowałem moduł npm, aby wyodrębnić szczegóły implementacji:

https://www.npmjs.com/package/streamify-string

To jest sedno modułu:

const Readable = require('stream').Readable;
const util     = require('util');

function Streamify(str, options) {

  if (! (this instanceof Streamify)) {
    return new Streamify(str, options);
  }

  Readable.call(this, options);
  this.str = str;
}

util.inherits(Streamify, Readable);

Streamify.prototype._read = function (size) {

  var chunk = this.str.slice(0, size);

  if (chunk) {
    this.str = this.str.slice(size);
    this.push(chunk);
  }

  else {
    this.push(null);
  }

};

module.exports = Streamify;

strjest tym, stringco musi zostać przekazane do konstruktora przy wywołaniu i zostanie wyprowadzone przez strumień jako dane. optionssą typowymi opcjami, które można przekazać do strumienia, zgodnie z dokumentacją .

Według Travisa CI powinien być kompatybilny z większością wersji node.

Chris Allen Lane
źródło
2
Kiedy to opublikowałem na początku, nie zamieściłem odpowiedniego kodu, co mi powiedziano, jest źle widziane.
Chris Allen Lane,
2

Oto uporządkowane rozwiązanie w języku TypeScript:

import { Readable } from 'stream'

class ReadableString extends Readable {
    private sent = false

    constructor(
        private str: string
    ) {
        super();
    }

    _read() {
        if (!this.sent) {
            this.push(Buffer.from(this.str));
            this.sent = true
        }
        else {
            this.push(null)
        }
    }
}

const stringStream = new ReadableString('string to be streamed...')
Russell Briggs
źródło
1

JavaScript jest wpisywany przez kaczkę, więc jeśli po prostu skopiujesz interfejs API strumienia czytelnego , będzie działał dobrze. W rzeczywistości, większości z tych metod prawdopodobnie nie da się zaimplementować lub po prostu pozostawić je jako kody pośredniczące; wszystko, co musisz zaimplementować, to to, czego używa biblioteka. Możesz użyć wstępnie zbudowanej EventEmitterklasy Node, aby radzić sobie ze zdarzeniami, więc nie musisz sam implementować addListeneri tego typu rzeczy .

Oto jak możesz to zaimplementować w CoffeeScript:

class StringStream extends require('events').EventEmitter
  constructor: (@string) -> super()

  readable: true
  writable: false

  setEncoding: -> throw 'not implemented'
  pause: ->    # nothing to do
  resume: ->   # nothing to do
  destroy: ->  # nothing to do
  pipe: -> throw 'not implemented'

  send: ->
    @emit 'data', @string
    @emit 'end'

Wtedy możesz go użyć w ten sposób:

stream = new StringStream someString
doSomethingWith stream
stream.send()
icktoofay
źródło
Rozumiem: TypeError: string is not a function at String.CALL_NON_FUNCTION (native) kiedy używam go jaknew StringStream(str).send()
pathikrit
To, że JavaScript używa pisania kaczego, nie oznacza, że ​​powinieneś odkrywać koło na nowo. Węzeł już zapewnia implementację dla strumieni. Po prostu utwórz nową instancję, stream.Readabletaką jak zasugerował @Garth Kidd.
Sukima
4
@Sukima: stream.Readable nie istniało, kiedy pisałem tę odpowiedź.
icktoofay