Prześlij strumień do s3.upload ()

89

Obecnie korzystam z wtyczki node.js o nazwie s3-upload-stream do przesyłania strumieniowego bardzo dużych plików do Amazon S3. Korzysta z wieloczęściowego API iw większości działa bardzo dobrze.

Jednak ten moduł pokazuje swój wiek i musiałem już wprowadzić do niego modyfikacje (autor również go wycofał). Dzisiaj natknąłem się na inny problem z Amazon i naprawdę chciałbym skorzystać z rekomendacji autora i zacząć używać oficjalnego aws-sdk do przesyłania plików.

ALE.

Wydaje się, że oficjalny pakiet SDK nie obsługuje przesyłania strumieniowego do s3.upload() . Naturą s3.upload jest to, że musisz przekazać czytelny strumień jako argument do konstruktora S3.

Mam około 120+ modułów kodu użytkownika, które przetwarzają różne pliki i są agnostykami w stosunku do ostatecznego miejsca docelowego ich wyjścia. Silnik podaje im potokowalny, zapisywalny strumień wyjściowy, a oni przesyłają do niego potokiem. Nie mogę podać im AWS.S3obiektu i poprosić o wywołanie upload()go bez dodawania kodu do wszystkich modułów. Powodem, dla którego użyłem, s3-upload-streambyło to, że obsługuje on orurowanie.

Czy istnieje sposób na zrobienie s3.upload()czegoś z aws-sdk, do którego mogę przesłać strumień?

womp
źródło

Odpowiedzi:

132

Owiń funkcję S3 upload()plikiem node.jsstream.PassThrough() strumieniem

Oto przykład:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
Casey Benko
źródło
2
Świetnie, to rozwiązało mój bardzo brzydki hack = -) Czy możesz wyjaśnić, co właściwie robi stream.PassThrough ()?
mraxus
6
Czy Twój strumień PassThrough zostaje zamknięty, gdy to zrobisz? Mam cholernie dużo czasu na propegowanie zamknięcia w s3.upload, aby trafić w mój strumień PassThrough.
czwarta43
7
rozmiar przesyłanego pliku to 0 bajtów. Jeśli przepuszczę te same dane ze strumienia źródłowego do systemu plików, wszystko działa dobrze. Dowolny pomysł?
Radar155
3
Strumień przejściowy pobierze zapisane do niego bajty i wyprowadzi je. Pozwala to zwrócić zapisywalny strumień, z którego będzie odczytywał aws-sdk podczas pisania do niego. Zwróciłbym również obiekt odpowiedzi z s3.upload (), ponieważ w przeciwnym razie nie możesz upewnić się, że przesyłanie się zakończyło.
rebot
1
skąd s3parametry wewnątrz potoku i streamskąd pochodzą?
Blackjack
94

Trochę późna odpowiedź, mam nadzieję, że pomoże to komuś innemu. Możesz zwrócić zarówno strumień z możliwością zapisu, jak i obietnicę, dzięki czemu możesz uzyskać dane odpowiedzi po zakończeniu przesyłania.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

Możesz użyć tej funkcji w następujący sposób:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

Teraz możesz sprawdzić obietnicę:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

Lub jako stream.pipe()zwraca stream.Writable, miejsce docelowe (zmienna writeStream powyżej), pozwalając na łańcuch potoków, możemy również użyć jego zdarzeń:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });
Ahmet Cetin
źródło
Wygląda świetnie, ale z mojej strony
pojawia
właśnie odpowiedział na Twoje pytanie. mam nadzieję, że to pomoże.
Ahmet Cetin
48

W zaakceptowanej odpowiedzi funkcja kończy się przed zakończeniem przesyłania, a więc jest niepoprawna. Poniższy kod prawidłowo przekazuje strumień z czytelnego strumienia.

Prześlij odniesienie

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

Możesz również pójść o krok dalej i wyświetlić informacje o postępie, używając ManagedUploadw ten sposób:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

Dokumentacja ManagedUpload

Lista dostępnych wydarzeń

tsuz
źródło
1
aws-sdk oferuje teraz obietnice wbudowane w 2.3.0+, więc nie musisz już ich podnosić. s3.upload (params) .promise (). then (data => data) .catch (error => error);
DBrown
1
@DBrown Dzięki za wskaźnik! Odpowiednio zaktualizowałem odpowiedź.
tsuz
1
@tsuz, próbując wdrożyć twoje rozwiązanie, daj mi błąd: masz TypeError: dest.on is not a functionjakiś pomysł, dlaczego?
FireBrand
Co to jest dest.on? Czy możesz pokazać przykład? @FireBrand
tsuz
9
To mówi, że zaakceptowana odpowiedź jest niekompletna, ale nie działa z potokowaniem do s3.upload, jak wskazano w zaktualizowanym poście @ Womp. Byłoby bardzo pomocne, gdyby ta odpowiedź została zaktualizowana, aby pobierać wyjście potokowe z czegoś innego!
MattW
6

Żadna z odpowiedzi nie zadziałała dla mnie, ponieważ chciałem:

  • Przepuść do s3.upload()
  • Prześlij wynik s3.upload()do innego strumienia

Przyjęta odpowiedź nie robi tego drugiego. Pozostałe opierają się na obiecującym interfejsie API, który jest kłopotliwy w pracy z rurami strumieniowymi.

To jest moja modyfikacja zaakceptowanej odpowiedzi.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

cortopy
źródło
Wygląda świetnie, ale z mojej strony
pojawia
5

Rozwiązanie typu skrypt: w
tym przykładzie użyto:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

I funkcja asynchroniczna:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

Wywołaj tę metodę gdzieś:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
dzole vladimirov
źródło
4

Rzecz, na którą należy zwrócić uwagę w najczęściej akceptowanej odpowiedzi powyżej, jest taka, że: Musisz zwrócić przepustkę w funkcji, jeśli używasz potoku,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

W przeciwnym razie po cichu przejdzie do następnej bez zgłaszania błędu lub zgłosi błąd w TypeError: dest.on is not a functionzależności od tego, jak napisałeś funkcję

varun bhaya
źródło
3

Jeśli pomoże to komuś, kogo udało mi się pomyślnie przesyłać strumieniowo z klienta do s3:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

Kod po stronie serwera zakłada, że reqjest to obiekt strumienia, w moim przypadku został wysłany od klienta z informacjami o pliku ustawionymi w nagłówkach.

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

Tak, to łamie konwencję, ale jeśli spojrzysz na sedno, jest znacznie czystszy niż cokolwiek innego, co znalazłem, używając multer, busboy itp ...

+1 za pragmatyzm i podziękowania dla @SalehenRahman za jego pomoc.

mattdlockyer
źródło
multer, busboy obsługuje przesyłanie danych wieloczęściowych / formularzy. req jako strumień działa, gdy klient wysyła bufor jako treść z XMLHttpRequest.
André Werlang
Aby wyjaśnić, przesyłanie odbywa się z zaplecza, a nie klienta, prawda?
numX
Tak, to „przepuszcza” strumień na zapleczu, ale pochodzi z frontendu
mattdlockyer
3

Dla tych, którzy narzekają, że kiedy używają funkcji przesyłania s3 api i plik o zerowym bajcie kończy się na s3 (@ Radar155 i @gabo) - też miałem ten problem.

Utwórz drugi strumień PassThrough i po prostu potokuj wszystkie dane z pierwszego do drugiego i przekaż odwołanie do tego drugiego do s3. Możesz to zrobić na kilka różnych sposobów - prawdopodobnie brudnym sposobem jest nasłuchiwanie zdarzenia „data” w pierwszym strumieniu, a następnie zapisywanie tych samych danych w drugim strumieniu - podobnie w przypadku zdarzenia „end” - wystarczy wywołać funkcja końca w drugim strumieniu. Nie mam pojęcia, czy jest to błąd w aws api, wersja węzła czy inny problem - ale udało mi się obejść ten problem.

Oto jak to może wyglądać:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
Tim
źródło
To również działało dla mnie. Funkcja wysyłania S3 po prostu „umierała” po cichu za każdym razem, gdy użyto przesyłania wieloczęściowego, ale podczas korzystania z rozwiązania działała dobrze (!). Dzięki! :)
jhdrn
Czy możesz powiedzieć, dlaczego potrzebny jest drugi strumień?
noob7
1

Podążając za innymi odpowiedziami i używając najnowszego AWS SDK dla Node.js, istnieje znacznie czystsze i prostsze rozwiązanie, ponieważ funkcja s3 upload () akceptuje strumień, używając składni await i obietnicy S3:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
emich
źródło
0

Używam KnexJS i miałem problem z używaniem ich strumieniowego API. W końcu to naprawiłem, mam nadzieję, że poniższe wskazówki pomogą komuś.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();
TestWell
źródło
-3

Jeśli znasz rozmiar strumienia, możesz użyć minio-js, aby przesłać strumień w następujący sposób:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })
Krishna Srinivas
źródło