Jaki jest najlepszy sposób ograniczenia współbieżności podczas korzystania z Promise.all () w ES6?

108

Mam kod, który wykonuje iterację po liście, do której odpytano z bazy danych, i wysyła żądanie HTTP dla każdego elementu na tej liście. Ta lista może czasami być dość duża (w tysiącach) i chciałbym się upewnić, że nie trafiam na serwer sieciowy z tysiącami jednoczesnych żądań HTTP.

Skrócona wersja tego kodu obecnie wygląda mniej więcej tak ...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

Ten kod działa w węźle 4.3.2. Powtarzając, czy Promise.allmożna tak zarządzać, aby tylko pewna liczba obietnic była realizowana w danym momencie?

Chris
źródło
3
Nie zapominaj, że Promise.allzarządza postępem obietnic - obietnice robią to same, Promise.allpo prostu na nie czekają.
Bergi

Odpowiedzi:

53

Zauważ, że Promise.all()nie powoduje to obietnic rozpoczęcia pracy, samo tworzenie obietnic tak.

Mając to na uwadze, jednym z rozwiązań byłoby sprawdzenie, gdy obietnica zostanie rozwiązana, czy należy rozpocząć nową obietnicę, czy też osiągnąłeś już limit.

Jednak tak naprawdę nie ma potrzeby ponownego wynajdywania koła. Jedna biblioteka, której możesz użyć do tego celu, toes6-promise-pool . Z ich przykładów:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
Timo
źródło
26
Szkoda, że ​​es6-promise-pool wymyśla na nowo Promise zamiast ich używać. Zamiast tego proponuję to zwięzłe rozwiązanie (jeśli używasz już ES6 lub ES7
Rafael Xavier
3
Przyjrzałem się obu, async-pool wygląda znacznie lepiej! Bardziej proste i lżejsze.
Niekończący się
3
Zauważyłem również, że p-limit jest najprostszą implementacją. Zobacz mój przykład poniżej. stackoverflow.com/a/52262024/8177355
Matthew Rideout
2
Myślę, że small-asyc-pool jest o wiele lepszym, nieinwazyjnym i raczej naturalnym rozwiązaniem ograniczania współbieżności obietnic.
Sunny Tambi
78

Limit P

Porównałem ograniczenie współbieżności obietnicy ze skryptem niestandardowym, bluebird, es6-pulą-obietnicy i limitem p. Uważam, że p-limit ma najprostszą, okrojoną implementację dla tej potrzeby. Zobacz ich dokumentację .

Wymagania

Na przykład zgodność z async

Mój przykład

W tym przykładzie musimy uruchomić funkcję dla każdego adresu URL w tablicy (na przykład może żądanie API). Tutaj to się nazywa fetchData(). Gdybyśmy mieli do przetworzenia tablicę tysięcy elementów, współbieżność z pewnością przydałaby się do oszczędzania zasobów procesora i pamięci.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

Wynik dziennika konsoli to tablica danych odpowiedzi na rozwiązane obietnice.

Matthew Rideout
źródło
4
Dzięki za to! Ten jest znacznie prostszy
Jan
3
To była zdecydowanie najlepsza biblioteka, jaką widziałem do ograniczania jednoczesnych żądań. I świetny przykład, dzięki!
Chris Livdahl
3
Dzięki za porównanie. Czy porównałeś z github.com/rxaviers/async-pool ?
ahong
1
Łatwy w użyciu, świetny wybór.
drmrbrewer
1
poza tym jeden ma około 20 milionów pobrań tygodniowo na npm w porównaniu z około 200-100k dla innych bibliotek wymienionych w innych odpowiedziach.
vir nas
25

Za pomocą Array.prototype.splice

while (funcs.length) {
  // 100 at at time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
zwolnił kawior
źródło
3
To niedoceniane rozwiązanie. Uwielbiam prostotę.
Brannon,
12
To uruchamia funkcje w partiach zamiast w puli, gdzie jedna funkcja jest wywoływana natychmiast po zakończeniu innej.
cltsang
Podobało mi się to rozwiązanie!
prasun
zajęło chwilę, aby zrozumieć, co robi przy braku większego kontekstu wokół niego, na przykład jako partia zamiast puli. Zmieniasz kolejność tablicy za każdym razem, gdy łączysz ją od początku lub w środku. (przeglądarka musi ponownie zindeksować wszystkie elementy) teoretyczną wydajnością lepszą alternatywą jest pobranie rzeczy od końca zamiast tego, arr.splice(-100)jeśli zamówienie nie jest zgodne, może możesz odwrócić tablicę: P
Endless
2
Bardzo przydatne do biegania w partiach. Uwaga: następna partia nie rozpocznie się, dopóki bieżąca partia nie zostanie ukończona w 100%.
Casey Dwayne,
24

Jeśli wiesz, jak działają iteratory i jak są one używane, nie potrzebujesz żadnej dodatkowej biblioteki, ponieważ samodzielne zbudowanie własnej współbieżności może stać się bardzo łatwe. Pozwólcie, że zademonstruję:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

Możemy użyć tego samego iteratora i udostępnić go wszystkim pracownikom.

Gdybyś użył .entries()zamiast .values()ciebie, dostałbyś tablicę 2D, z [[index, value]]którą pokażę poniżej, ze współbieżnością 2

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

Zaletą tego jest to, że możesz mieć funkcję generatora zamiast mieć wszystko gotowe od razu.


Uwaga: różnica od tego w porównaniu z przykładową pulą asynchroniczną polega na tym, że tworzy ona dwóch pracowników, więc jeśli jeden pracownik zgłosi błąd z jakiegoś powodu w, powiedzmy, indeksie 5, nie powstrzyma to drugiego pracownika od wykonania reszty. Więc przechodzisz od 2 współbieżności do 1 (więc na tym się nie kończy) Więc radzę wychwycić wszystkie błędy wewnątrz doWorkfunkcji

Nieskończony
źródło
To jest niesamowite! Dzięki nieskończone!
user3413723
To zdecydowanie fajne podejście! Tylko upewnij się, że współbieżność nie przekracza długości listy zadań (jeśli i tak zależy Ci na wynikach), ponieważ możesz skończyć z dodatkami!
Kris Oye
Coś, co może być fajniejsze później, to sytuacja, gdy strumienie otrzymają obsługę Readable.from (iterator) . Chrome umożliwił już przenoszenie strumieni . więc można było tworzyć czytelne strumienie i wysyłać je do pracowników sieciowych, a wszystkie z nich korzystałyby z tego samego podstawowego iteratora.
Endless
16

Bluebird za Promise.map można wziąć opcję współbieżności do kontrolowania ile obietnice powinny być uruchomione równolegle. Czasami jest to łatwiejsze niż .alldlatego, że nie musisz tworzyć tablicy obietnicy.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Jingshao Chen
źródło
bluebird jest świetny, jeśli potrzebujesz szybszych obietnic i ~ 18kb dodatkowych śmieci, jeśli używasz go tylko do jednego celu;)
Endless
2
Wszystko zależy od tego, jak ważna jest dla Ciebie jedna rzecz i czy istnieje inny, szybszy / łatwiejszy, lepszy sposób. Typowy kompromis. Wybieram łatwość obsługi i funkcjonalności na kilka kb, ale YMMV.
Jingshao Chen
13

Zamiast używać obietnic do ograniczania żądań http, użyj wbudowanych węzłów http.Agent.maxSockets . Eliminuje to konieczność korzystania z biblioteki lub pisania własnego kodu puli, a dodatkową zaletą jest większa kontrola nad ograniczeniami.

agent.maxSockets

Domyślnie ustawiony na Nieskończoność. Określa, ile współbieżnych gniazd agent może mieć otwartych na źródło. Origin to kombinacja „host: port” lub „host: port: localAddress”.

Na przykład:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

Jeśli wysyłasz wiele żądań do tego samego źródła, możesz również ustawić wartość keepAlivetrue (zobacz dokumenty powyżej, aby uzyskać więcej informacji).

tcooc
źródło
12
Mimo to natychmiastowe tworzenie tysięcy zamknięć i łączenie gniazd nie wydaje się być bardzo wydajne?
Bergi
3

Proponuję bibliotekę async-pool: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

Opis:

Uruchamiaj wiele funkcji asynchronicznych i zwracających obietnicę z ograniczoną współbieżnością przy użyciu natywnego ES6 / ES7

asyncPool uruchamia wiele funkcji zwracających obietnice i funkcji asynchronicznych w ograniczonej puli współbieżności. Odrzuca natychmiast, gdy tylko jedna z obietnic odrzuca. Ustala się, gdy spełnią się wszystkie obietnice. Wywołuje funkcję iteratora tak szybko, jak to możliwe (w ramach limitu współbieżności).

Stosowanie:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
Venryx
źródło
1
Pracuje dla mnie. Dzięki. To świetna biblioteka.
Sunny Tambi
2

Można go rozwiązać za pomocą rekursji.

Pomysł polega na tym, że początkowo wysyłasz maksymalną dozwoloną liczbę żądań, a każde z tych żądań powinno rekurencyjnie nadal wysyłać się po zakończeniu.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});
Anton Fil
źródło
2

Oto moje rozwiązanie ES7, które jest przyjazne dla kopiowania i wklejania i ma pełną Promise.all()/ map()alternatywną funkcję , z limitem współbieżności.

Podobnie Promise.all()zachowuje kolejność zwrotu, a także rezerwę dla wartości zwrotów niezwiązanych z obietnicą.

Dołączyłem również porównanie różnych implementacji, ponieważ ilustruje to niektóre aspekty, które zostały pominięte w kilku innych rozwiązaniach.

Stosowanie

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

Realizacja

async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  }
  return outs;
}

async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
        resolve(outs);
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
            pollNext();
          };
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);
          }
        }
      }
    };
    pollNext();
  });
}

Porównanie

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
  console.log(delay);
  resolve(delay);
}, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

Wniosek

asyncPool() powinno być najlepszym rozwiązaniem, ponieważ umożliwia rozpoczęcie nowych żądań zaraz po zakończeniu poprzedniego.

asyncBatch() jest uwzględniony jako porównanie, ponieważ jego implementacja jest prostsza do zrozumienia, ale powinna działać wolniej, ponieważ wszystkie żądania z tej samej partii muszą zostać zakończone, aby rozpocząć następną.

W tym wymyślonym przykładzie wanilia bez ograniczeń Promise.all()jest oczywiście najszybsza, podczas gdy inne mogłyby działać bardziej pożądane w scenariuszu zatoru w świecie rzeczywistym.

Aktualizacja

Biblioteka async-pool, którą inni już zasugerowali, jest prawdopodobnie lepszą alternatywą dla mojej implementacji, ponieważ działa prawie identycznie i ma bardziej zwięzłą implementację dzięki sprytnemu wykorzystaniu Promise.race (): https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js

Mam nadzieję, że moja odpowiedź może nadal mieć wartość edukacyjną.

Adelost
źródło
1

Oto podstawowy przykład przesyłania strumieniowego i „limitu p”. Wysyła strumieniowo strumień odczytu http do bazy danych mongo.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }
gosuer1921
źródło
0

Próbowałem więc zrobić kilka pokazanych przykładów dla mojego kodu, ale ponieważ było to tylko dla skryptu importu, a nie kodu produkcyjnego, użycie obietnic wsadowych pakietu npm z pewnością było dla mnie najłatwiejszą ścieżką

UWAGA: Wymaga środowiska wykonawczego do obsługi Promise lub wypełnienia.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee będzie wywoływana po każdej partii.

Posługiwać się:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

Agusti Fernandez Pardo
źródło
0

Rekursja jest odpowiedzią, jeśli nie chcesz korzystać z bibliotek zewnętrznych

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}
Juan
źródło
0

To jest to, czego użyłem Promise.racew moim kodzie tutaj

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

Jeśli chcesz zobaczyć przykład: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

Alex
źródło
2
Nie nazwałbym tej współbieżności ... To bardziej przypomina wykonanie wsadowe ... Wykonujesz 4 zadania, czekasz, aż wszystkie się zakończą, a następnie wykonujesz następne 4. Jeśli jedno z nich zostanie rozwiązane wcześniej, nadal czekasz na zakończenie pozostałych 3 , powinieneś używaćPromise.race
Endless
0
  • @tcooc była całkiem fajna. Nie wiedziałem o tym i wykorzystam to w przyszłości.
  • Podobała mi się również odpowiedź @MatthewRideout , ale używa ona zewnętrznej biblioteki !!

O ile to możliwe, staram się samodzielnie rozwijać tego typu rzeczy, zamiast chodzić do biblioteki. W końcu uczysz się wielu pojęć, które wcześniej wydawały się trudne.

Co sądzicie o tej próbie:
(Bardzo się nad tym zastanawiałem i myślę, że to działa, ale zwróć uwagę, czy tak nie jest lub jest coś zasadniczo nie tak)

 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        }

        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
            }
        }

        add(f){  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => {
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            })
        }
    }

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
                resolve(id);
            }
        }, timeout)
    }
    )
}


const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

Takie podejście zapewnia przyjemny interfejs API, podobny do puli wątków w scala / java.
Po utworzeniu jednej instancji puli za const cappedPool = new Pool(2)pomocą po prostu składasz jej obietnice cappedPool.add(() => myPromise).
Nieświadomie musimy upewnić się, że obietnica nie rozpocznie się natychmiast i dlatego musimy „leniwie ją udzielać” przy pomocy funkcji.

Co najważniejsze, zwróć uwagę, że wynikiem tej metody add jest Obietnica, która zostanie uzupełniona / rozwiązana z wartością Twojej pierwotnej obietnicy ! Zapewnia to bardzo intuicyjne użytkowanie.

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
   // Do something with the result form the DB
  }
)
Carlos Teixeira
źródło
0

Niestety nie ma sposobu, aby to zrobić z natywnym Promise.all, więc musisz być kreatywny.

Jest to najszybszy i najbardziej zwięzły sposób, jaki mogłem znaleźć bez korzystania z zewnętrznych bibliotek.

Wykorzystuje nowszą funkcję javascript zwaną iteratorem. Iterator w zasadzie śledzi, które elementy zostały przetworzone, a które nie.

Aby użyć go w kodzie, tworzysz tablicę funkcji asynchronicznych. Każda funkcja asynchroniczna prosi ten sam iterator o następny element, który musi zostać przetworzony. Każda funkcja przetwarza swój własny element asynchronicznie, a po zakończeniu prosi iterator o nowy. Gdy w iteratorze skończą się elementy, wszystkie funkcje są zakończone.

Dzięki @Endless za inspirację.

var items = [
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
];

var concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => {
    for(let [index, url] of cursor){
        console.log("getting url is ", index, url);
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text());
        console.log("text is", text.slice(0,20));
    }
})

user3413723
źródło
Ciekawe, dlaczego zostało to zaznaczone. Jest bardzo podobny do tego, co wymyśliłem.
Kris Oye
0

Tyle dobrych rozwiązań. Zacząłem od eleganckiego rozwiązania opublikowanego przez @Endless, a skończyłem na tej małej metodzie rozszerzenia, która nie używa żadnych zewnętrznych bibliotek ani nie działa partiami (chociaż zakłada, że ​​masz funkcje takie jak asynchronizacja itp.):

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    }
                    catch (err) {
                        results[index] = err;
                    }
                }
                // No more work to do
                resolve(true); 
            }
            catch (err) {
                // This worker is dead
                reject(err);
            }
        }));

    await Promise.all(workerThreads);
    return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        }
                        catch (err) {
                            results[index] = err;
                        }
                    }
                    // No more work to do
                    resolve(true); 
                }
                catch (err) {
                    // This worker is dead
                    reject(err);
                }
            }));
    
        await Promise.all(workerThreads);
        return results;
    };

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
          resolve(n);
       }, n * 1000);
    }));

    var results = Promise.allWithLimit(demoTasks);

Kris Oye
źródło
0

rozwijając odpowiedź przesłaną przez @deceleratedcaviar, stworzyłem funkcję narzędzia „wsadowego”, która przyjmuje jako argument: tablicę wartości, limit współbieżności i funkcję przetwarzania. Tak, zdaję sobie sprawę, że korzystanie z Promise.all w ten sposób jest bardziej zbliżone do przetwarzania wsadowego w porównaniu z prawdziwą współbieżnością, ale jeśli celem jest ograniczenie nadmiernej liczby wywołań HTTP jednocześnie, wybieram to podejście ze względu na jego prostotę i brak konieczności korzystania z zewnętrznej biblioteki .

async function batch(o) {
  let arr = o.arr
  let resp = []
  while (arr.length) {
    let subset = arr.splice(0, o.limit)
    let results = await Promise.all(subset.map(o.process))
    resp.push(results)
  }
  return [].concat.apply([], resp)
}

let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }

async function calc(val) { return val * 100 }

(async () => {
  let resp = await batch({
    arr: arr,
    limit: 100,
    process: calc
  })
  console.log(resp)
})();

Eugene Blinn
źródło