Mam kod, który wykonuje iterację po liście, do której odpytano z bazy danych, i wysyła żądanie HTTP dla każdego elementu na tej liście. Ta lista może czasami być dość duża (w tysiącach) i chciałbym się upewnić, że nie trafiam na serwer sieciowy z tysiącami jednoczesnych żądań HTTP.
Skrócona wersja tego kodu obecnie wygląda mniej więcej tak ...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
Ten kod działa w węźle 4.3.2. Powtarzając, czy Promise.all
można tak zarządzać, aby tylko pewna liczba obietnic była realizowana w danym momencie?
javascript
node.js
es6-promise
Chris
źródło
źródło
Promise.all
zarządza postępem obietnic - obietnice robią to same,Promise.all
po prostu na nie czekają.Promise
antywzorca konstruktora !Odpowiedzi:
Zauważ, że
Promise.all()
nie powoduje to obietnic rozpoczęcia pracy, samo tworzenie obietnic tak.Mając to na uwadze, jednym z rozwiązań byłoby sprawdzenie, gdy obietnica zostanie rozwiązana, czy należy rozpocząć nową obietnicę, czy też osiągnąłeś już limit.
Jednak tak naprawdę nie ma potrzeby ponownego wynajdywania koła. Jedna biblioteka, której możesz użyć do tego celu, to
es6-promise-pool
. Z ich przykładów:// On the Web, leave out this line and use the script tag above instead. var PromisePool = require('es6-promise-pool') var promiseProducer = function () { // Your code goes here. // If there is work left to be done, return the next work item as a promise. // Otherwise, return null to indicate that all promises have been created. // Scroll down for an example. } // The number of promises to process simultaneously. var concurrency = 3 // Create a pool. var pool = new PromisePool(promiseProducer, concurrency) // Start the pool. var poolPromise = pool.start() // Wait for the pool to settle. poolPromise.then(function () { console.log('All promises fulfilled') }, function (error) { console.log('Some promise rejected: ' + error.message) })
źródło
Limit P
Porównałem ograniczenie współbieżności obietnicy ze skryptem niestandardowym, bluebird, es6-pulą-obietnicy i limitem p. Uważam, że p-limit ma najprostszą, okrojoną implementację dla tej potrzeby. Zobacz ich dokumentację .
Wymagania
Na przykład zgodność z async
Mój przykład
W tym przykładzie musimy uruchomić funkcję dla każdego adresu URL w tablicy (na przykład może żądanie API). Tutaj to się nazywa
fetchData()
. Gdybyśmy mieli do przetworzenia tablicę tysięcy elementów, współbieżność z pewnością przydałaby się do oszczędzania zasobów procesora i pamięci.const pLimit = require('p-limit'); // Example Concurrency of 3 promise at once const limit = pLimit(3); let urls = [ "http://www.exampleone.com/", "http://www.exampletwo.com/", "http://www.examplethree.com/", "http://www.examplefour.com/", ] // Create an array of our promises using map (fetchData() returns a promise) let promises = urls.map(url => { // wrap the function we are calling in the limit function we defined above return limit(() => fetchData(url)); }); (async () => { // Only three promises are run at once (as defined above) const result = await Promise.all(promises); console.log(result); })();
Wynik dziennika konsoli to tablica danych odpowiedzi na rozwiązane obietnice.
źródło
Za pomocą
Array.prototype.splice
while (funcs.length) { // 100 at at time await Promise.all( funcs.splice(0, 100).map(f => f()) ) }
źródło
arr.splice(-100)
jeśli zamówienie nie jest zgodne, może możesz odwrócić tablicę: PJeśli wiesz, jak działają iteratory i jak są one używane, nie potrzebujesz żadnej dodatkowej biblioteki, ponieważ samodzielne zbudowanie własnej współbieżności może stać się bardzo łatwe. Pozwólcie, że zademonstruję:
/* [Symbol.iterator]() is equivalent to .values() const iterator = [1,2,3][Symbol.iterator]() */ const iterator = [1,2,3].values() // loop over all items with for..of for (const x of iterator) { console.log('x:', x) // notices how this loop continues the same iterator // and consumes the rest of the iterator, making the // outer loop not logging any more x's for (const y of iterator) { console.log('y:', y) } }
Możemy użyć tego samego iteratora i udostępnić go wszystkim pracownikom.
Gdybyś użył
.entries()
zamiast.values()
ciebie, dostałbyś tablicę 2D, z[[index, value]]
którą pokażę poniżej, ze współbieżnością 2const sleep = t => new Promise(rs => setTimeout(rs, t)) async function doWork(iterator) { for (let [index, item] of iterator) { await sleep(1000) console.log(index + ': ' + item) } } const iterator = Array.from('abcdefghij').entries() const workers = new Array(2).fill(iterator).map(doWork) // ^--- starts two workers sharing the same iterator Promise.allSettled(workers).then(() => console.log('done'))
Zaletą tego jest to, że możesz mieć funkcję generatora zamiast mieć wszystko gotowe od razu.
Uwaga: różnica od tego w porównaniu z przykładową pulą asynchroniczną polega na tym, że tworzy ona dwóch pracowników, więc jeśli jeden pracownik zgłosi błąd z jakiegoś powodu w, powiedzmy, indeksie 5, nie powstrzyma to drugiego pracownika od wykonania reszty. Więc przechodzisz od 2 współbieżności do 1 (więc na tym się nie kończy) Więc radzę wychwycić wszystkie błędy wewnątrz
doWork
funkcjiźródło
Bluebird za Promise.map można wziąć opcję współbieżności do kontrolowania ile obietnice powinny być uruchomione równolegle. Czasami jest to łatwiejsze niż
.all
dlatego, że nie musisz tworzyć tablicy obietnicy.const Promise = require('bluebird') function getCounts() { return Promise.map(users, user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }, {concurrency: 10}); // <---- at most 10 http requests at a time }
źródło
Zamiast używać obietnic do ograniczania żądań http, użyj wbudowanych węzłów http.Agent.maxSockets . Eliminuje to konieczność korzystania z biblioteki lub pisania własnego kodu puli, a dodatkową zaletą jest większa kontrola nad ograniczeniami.
Na przykład:
var http = require('http'); var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin var request = http.request({..., agent: agent}, ...);
Jeśli wysyłasz wiele żądań do tego samego źródła, możesz również ustawić wartość
keepAlive
true (zobacz dokumenty powyżej, aby uzyskać więcej informacji).źródło
Proponuję bibliotekę async-pool: https://github.com/rxaviers/async-pool
npm install tiny-async-pool
Opis:
Stosowanie:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
źródło
Można go rozwiązać za pomocą rekursji.
Pomysł polega na tym, że początkowo wysyłasz maksymalną dozwoloną liczbę żądań, a każde z tych żądań powinno rekurencyjnie nadal wysyłać się po zakończeniu.
function batchFetch(urls, concurrentRequestsLimit) { return new Promise(resolve => { var documents = []; var index = 0; function recursiveFetch() { if (index === urls.length) { return; } fetch(urls[index++]).then(r => { documents.push(r.text()); if (documents.length === urls.length) { resolve(documents); } else { recursiveFetch(); } }); } for (var i = 0; i < concurrentRequestsLimit; i++) { recursiveFetch(); } }); } var sources = [ 'http://www.example_1.com/', 'http://www.example_2.com/', 'http://www.example_3.com/', ... 'http://www.example_100.com/' ]; batchFetch(sources, 5).then(documents => { console.log(documents); });
źródło
Oto moje rozwiązanie ES7, które jest przyjazne dla kopiowania i wklejania i ma pełną
Promise.all()
/map()
alternatywną funkcję , z limitem współbieżności.Podobnie
Promise.all()
zachowuje kolejność zwrotu, a także rezerwę dla wartości zwrotów niezwiązanych z obietnicą.Dołączyłem również porównanie różnych implementacji, ponieważ ilustruje to niektóre aspekty, które zostały pominięte w kilku innych rozwiązaniach.
Stosowanie
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay)); const args = [30, 20, 15, 10]; await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
Realizacja
async function asyncBatch(args, fn, limit = 8) { // Copy arguments to avoid side effects args = [...args]; const outs = []; while (args.length) { const batch = args.splice(0, limit); const out = await Promise.all(batch.map(fn)); outs.push(...out); } return outs; } async function asyncPool(args, fn, limit = 8) { return new Promise((resolve) => { // Copy arguments to avoid side effect, reverse queue as // pop is faster than shift const argQueue = [...args].reverse(); let count = 0; const outs = []; const pollNext = () => { if (argQueue.length === 0 && count === 0) { resolve(outs); } else { while (count < limit && argQueue.length) { const index = args.length - argQueue.length; const arg = argQueue.pop(); count += 1; const out = fn(arg); const processOut = (out, index) => { outs[index] = out; count -= 1; pollNext(); }; if (typeof out === 'object' && out.then) { out.then(out => processOut(out, index)); } else { processOut(out, index); } } } }; pollNext(); }); }
Porównanie
// A simple async function that returns after the given delay // and prints its value to allow us to determine the response order const asyncFn = delay => new Promise(resolve => setTimeout(() => { console.log(delay); resolve(delay); }, delay)); // List of arguments to the asyncFn function const args = [30, 20, 15, 10]; // As a comparison of the different implementations, a low concurrency // limit of 2 is used in order to highlight the performance differences. // If a limit greater than or equal to args.length is used the results // would be identical. // Vanilla Promise.all/map combo const out1 = await Promise.all(args.map(arg => asyncFn(arg))); // prints: 10, 15, 20, 30 // total time: 30ms // Pooled implementation const out2 = await asyncPool(args, arg => asyncFn(arg), 2); // prints: 20, 30, 15, 10 // total time: 40ms // Batched implementation const out3 = await asyncBatch(args, arg => asyncFn(arg), 2); // prints: 20, 30, 20, 30 // total time: 45ms console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3 // Conclusion: Execution order and performance is different, // but return order is still identical
Wniosek
asyncPool()
powinno być najlepszym rozwiązaniem, ponieważ umożliwia rozpoczęcie nowych żądań zaraz po zakończeniu poprzedniego.asyncBatch()
jest uwzględniony jako porównanie, ponieważ jego implementacja jest prostsza do zrozumienia, ale powinna działać wolniej, ponieważ wszystkie żądania z tej samej partii muszą zostać zakończone, aby rozpocząć następną.W tym wymyślonym przykładzie wanilia bez ograniczeń
Promise.all()
jest oczywiście najszybsza, podczas gdy inne mogłyby działać bardziej pożądane w scenariuszu zatoru w świecie rzeczywistym.Aktualizacja
Biblioteka async-pool, którą inni już zasugerowali, jest prawdopodobnie lepszą alternatywą dla mojej implementacji, ponieważ działa prawie identycznie i ma bardziej zwięzłą implementację dzięki sprytnemu wykorzystaniu Promise.race (): https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js
Mam nadzieję, że moja odpowiedź może nadal mieć wartość edukacyjną.
źródło
Oto podstawowy przykład przesyłania strumieniowego i „limitu p”. Wysyła strumieniowo strumień odczytu http do bazy danych mongo.
const stream = require('stream'); const util = require('util'); const pLimit = require('p-limit'); const es = require('event-stream'); const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; const pipeline = util.promisify(stream.pipeline) const outputDBConfig = { dbURL: 'yr-db-url', collection: 'some-collection' }; const limit = pLimit(3); async yrAsyncStreamingFunction(readStream) => { const mongoWriteStream = streamToMongoDB(outputDBConfig); const mapperStream = es.map((data, done) => { let someDataPromise = limit(() => yr_async_call_to_somewhere()) someDataPromise.then( function handleResolve(someData) { data.someData = someData; done(null, data); }, function handleError(error) { done(error) } ); }) await pipeline( readStream, JSONStream.parse('*'), mapperStream, mongoWriteStream ); }
źródło
Próbowałem więc zrobić kilka pokazanych przykładów dla mojego kodu, ale ponieważ było to tylko dla skryptu importu, a nie kodu produkcyjnego, użycie obietnic wsadowych pakietu npm z pewnością było dla mnie najłatwiejszą ścieżką
UWAGA: Wymaga środowiska wykonawczego do obsługi Promise lub wypełnienia.
Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee będzie wywoływana po każdej partii.
Posługiwać się:
batch-promises Easily batch promises NOTE: Requires runtime to support Promise or to be polyfilled. Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch. Use: import batchPromises from 'batch-promises'; batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { // The iteratee will fire after each batch resulting in the following behaviour: // @ 100ms resolve items 1 and 2 (first batch of 2) // @ 200ms resolve items 3 and 4 (second batch of 2) // @ 300ms resolve remaining item 5 (last remaining batch) setTimeout(() => { resolve(i); }, 100); })) .then(results => { console.log(results); // [1,2,3,4,5] });
źródło
Rekursja jest odpowiedzią, jeśli nie chcesz korzystać z bibliotek zewnętrznych
downloadAll(someArrayWithData){ var self = this; var tracker = function(next){ return self.someExpensiveRequest(someArrayWithData[next]) .then(function(){ next++;//This updates the next in the tracker function parameter if(next < someArrayWithData.length){//Did I finish processing all my data? return tracker(next);//Go to the next promise } }); } return tracker(0); }
źródło
To jest to, czego użyłem
Promise.race
w moim kodzie tutajconst identifyTransactions = async function() { let promises = [] let concurrency = 0 for (let tx of this.transactions) { if (concurrency > 4) await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) promises.push(tx.identifyTransaction()) concurrency++ } if (promises.length > 0) await Promise.race(promises) //resolve the rest }
Jeśli chcesz zobaczyć przykład: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
źródło
Promise.race
O ile to możliwe, staram się samodzielnie rozwijać tego typu rzeczy, zamiast chodzić do biblioteki. W końcu uczysz się wielu pojęć, które wcześniej wydawały się trudne.
Co sądzicie o tej próbie:
(Bardzo się nad tym zastanawiałem i myślę, że to działa, ale zwróć uwagę, czy tak nie jest lub jest coś zasadniczo nie tak)
class Pool{ constructor(maxAsync) { this.maxAsync = maxAsync; this.asyncOperationsQueue = []; this.currentAsyncOperations = 0 } runAnother() { if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) { this.currentAsyncOperations += 1; this.asyncOperationsQueue.pop()() .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() }) } } add(f){ // the argument f is a function of signature () => Promise this.runAnother(); return new Promise((resolve, reject) => { this.asyncOperationsQueue.push( () => f().then(resolve).catch(reject) ) }) } } //####################################################### // TESTS //####################################################### function dbCall(id, timeout, fail) { return new Promise((resolve, reject) => { setTimeout(() => { if (fail) { reject(`Error for id ${id}`); } else { resolve(id); } }, timeout) } ) } const dbQuery1 = () => dbCall(1, 5000, false); const dbQuery2 = () => dbCall(2, 5000, false); const dbQuery3 = () => dbCall(3, 5000, false); const dbQuery4 = () => dbCall(4, 5000, true); const dbQuery5 = () => dbCall(5, 5000, false); const cappedPool = new Pool(2); const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
Takie podejście zapewnia przyjemny interfejs API, podobny do puli wątków w scala / java.
Po utworzeniu jednej instancji puli za
const cappedPool = new Pool(2)
pomocą po prostu składasz jej obietnicecappedPool.add(() => myPromise)
.Nieświadomie musimy upewnić się, że obietnica nie rozpocznie się natychmiast i dlatego musimy „leniwie ją udzielać” przy pomocy funkcji.
Co najważniejsze, zwróć uwagę, że wynikiem tej metody
add
jest Obietnica, która zostanie uzupełniona / rozwiązana z wartością Twojej pierwotnej obietnicy ! Zapewnia to bardzo intuicyjne użytkowanie.const resultPromise = cappedPool.add( () => dbCall(...)) resultPromise .then( actualResult => { // Do something with the result form the DB } )
źródło
Niestety nie ma sposobu, aby to zrobić z natywnym Promise.all, więc musisz być kreatywny.
Jest to najszybszy i najbardziej zwięzły sposób, jaki mogłem znaleźć bez korzystania z zewnętrznych bibliotek.
Wykorzystuje nowszą funkcję javascript zwaną iteratorem. Iterator w zasadzie śledzi, które elementy zostały przetworzone, a które nie.
Aby użyć go w kodzie, tworzysz tablicę funkcji asynchronicznych. Każda funkcja asynchroniczna prosi ten sam iterator o następny element, który musi zostać przetworzony. Każda funkcja przetwarza swój własny element asynchronicznie, a po zakończeniu prosi iterator o nowy. Gdy w iteratorze skończą się elementy, wszystkie funkcje są zakończone.
Dzięki @Endless za inspirację.
var items = [ "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", ]; var concurrency = 5 Array(concurrency).fill(items.entries()).map(async (cursor) => { for(let [index, url] of cursor){ console.log("getting url is ", index, url); // run your async task instead of this next line var text = await fetch(url).then(res => res.text()); console.log("text is", text.slice(0,20)); } })
źródło
Tyle dobrych rozwiązań. Zacząłem od eleganckiego rozwiązania opublikowanego przez @Endless, a skończyłem na tej małej metodzie rozszerzenia, która nie używa żadnych zewnętrznych bibliotek ani nie działa partiami (chociaż zakłada, że masz funkcje takie jak asynchronizacja itp.):
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; };
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; }; const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => { let n = (i + 1) * 5; setTimeout(() => { console.log(`Did nothing for ${n} seconds`); resolve(n); }, n * 1000); })); var results = Promise.allWithLimit(demoTasks);
źródło
rozwijając odpowiedź przesłaną przez @deceleratedcaviar, stworzyłem funkcję narzędzia „wsadowego”, która przyjmuje jako argument: tablicę wartości, limit współbieżności i funkcję przetwarzania. Tak, zdaję sobie sprawę, że korzystanie z Promise.all w ten sposób jest bardziej zbliżone do przetwarzania wsadowego w porównaniu z prawdziwą współbieżnością, ale jeśli celem jest ograniczenie nadmiernej liczby wywołań HTTP jednocześnie, wybieram to podejście ze względu na jego prostotę i brak konieczności korzystania z zewnętrznej biblioteki .
async function batch(o) { let arr = o.arr let resp = [] while (arr.length) { let subset = arr.splice(0, o.limit) let results = await Promise.all(subset.map(o.process)) resp.push(results) } return [].concat.apply([], resp) } let arr = [] for (let i = 0; i < 250; i++) { arr.push(i) } async function calc(val) { return val * 100 } (async () => { let resp = await batch({ arr: arr, limit: 100, process: calc }) console.log(resp) })();
źródło