Jak ograniczyć liczbę współbieżnych asynchronicznych operacji we / wy?

115
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Oto problem, uruchamia ponad 1000 jednoczesnych żądań internetowych. Czy istnieje łatwy sposób ograniczenia równoczesnej liczby tych asynchronicznych żądań http? Tak, aby w danym momencie pobieranych było nie więcej niż 20 stron internetowych. Jak to zrobić najefektywniej?

Grief Coder
źródło
2
Czym to się różni od poprzedniego pytania ?
svick
1
stackoverflow.com/questions/9290498/… Z parametrem ParallelOptions.
Chris Disley
4
@ChrisDisley, spowoduje to tylko zrównoleglenie uruchamiania żądań.
wydający
@svick ma rację, czym się różni? btw, uwielbiam tam odpowiedź stackoverflow.com/a/10802883/66372
eglasius
3
Poza tym HttpClientjest IDisposablei powinieneś się go pozbyć, zwłaszcza gdy masz zamiar użyć ich ponad 1000. HttpClientmoże być używany jako singleton dla wielu żądań.
Shimmy Weitzhandler

Odpowiedzi:

161

Z pewnością możesz to zrobić w najnowszych wersjach async dla .NET, używając .NET 4.5 Beta. Poprzedni post z „usr” wskazuje na dobry artykuł napisany przez Stephena Touba, ale mniej ogłoszoną wiadomością jest to, że semafor asynchroniczny faktycznie trafił do wersji beta .NET 4.5

Jeśli spojrzysz na naszą ukochaną SemaphoreSlimklasę (której powinieneś używać, ponieważ jest bardziej wydajna niż oryginalna Semaphore), teraz oferuje WaitAsync(...)serię przeciążeń, ze wszystkimi oczekiwanymi argumentami - przedziały czasu, tokeny anulowania, wszyscy twoi znajomi z harmonogramu: )

Stephen napisał również nowszy wpis na blogu dotyczący nowych dodatków .NET 4.5, które pojawiły się wraz z wersją beta, zobacz Co nowego w paralelizmie w .NET 4.5 Beta .

Na koniec, oto przykładowy kod dotyczący używania SemaphoreSlim do ograniczania metody asynchronicznej:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Na koniec warto wspomnieć o rozwiązaniu wykorzystującym planowanie w oparciu o TPL. W TPL można tworzyć zadania powiązane z delegatem, które nie zostały jeszcze uruchomione, i zezwolić niestandardowemu harmonogramowi zadań na ograniczenie współbieżności. W rzeczywistości jest tutaj próbka MSDN:

Zobacz także TaskScheduler .

Theo Yaung
źródło
3
czy nie jest paralelą. czy każdy z ograniczonym stopniem paralelizmu jest ładniejszym podejściem? msdn.microsoft.com/en-us/library/…
GreyCloud
2
Dlaczego się nie pozbędzieszHttpClient
Shimmy Weitzhandler,
4
@GreyCloud: Parallel.ForEachdziała z kodem synchronicznym. Umożliwia to wywołanie kodu asynchronicznego.
Josh Noe
2
@TheMonarch mylisz się . Poza tym to jest zawsze dobry zwyczaj, aby owinąć wszystkie IDisposablesw usinglub try-finallyoświadczeń, oraz zapewnić ich dyspozycji.
Shimmy Weitzhandler
29
Biorąc pod uwagę popularność tej odpowiedzi, warto zauważyć, że HttpClient może i powinien być pojedynczą wspólną instancją, a nie instancją na żądanie.
Rupert Rawnsley
15

Jeśli masz IEnumerable (tj. Ciągi adresów URL) i chcesz wykonać operację związaną z we / wy z każdym z nich (tj. Wykonać asynchroniczne żądanie http) jednocześnie ORAZ opcjonalnie chcesz również ustawić maksymalną liczbę równoczesnych Żądania we / wy w czasie rzeczywistym, oto jak możesz to zrobić. W ten sposób nie używasz puli wątków i innych, metoda używa semaforeslim do kontrolowania maksymalnej liczby równoczesnych żądań we / wy, podobnie jak we wzorcu przesuwanego okna, które kończy jedno żądanie, opuszcza semafor, a następne dostaje się.

użycie: await ForEachAsync (urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
Dogu Arslan
źródło
nie, nie powinieneś jawnie pozbywać się SemaphoreSlim w tej implementacji i użyciu, ponieważ jest używany wewnętrznie w metodzie, a metoda nie uzyskuje dostępu do jej właściwości AvailableWaitHandle, w którym to przypadku musielibyśmy usunąć ją lub zawinąć w bloku using.
Dogu Arslan
1
Myśląc tylko o najlepszych praktykach i lekcjach, których uczymy innych ludzi. usingByłoby miło.
AgentFire
cóż, ten przykład mogę naśladować, ale próbując znaleźć najlepszy sposób na zrobienie tego, w zasadzie mam przepustnicę, ale moja Func zwróci listę, którą ostatecznie chcę na ostatecznej liście wszystkich ukończonych po zakończeniu ... co może wymagają zablokowania na liście, czy masz sugestie.
Seabizkit
możesz nieznacznie zaktualizować metodę, aby zwracała listę rzeczywistych zadań i czekasz na Task.WhenAll z wnętrza kodu wywołującego. Po zakończeniu zadania Task.WhenAll możesz wyliczyć każde zadanie na liście i dodać jego listę do ostatecznej listy. Zmień sygnaturę metody na 'public static IEnumerable <Task <TOut>> ForEachAsync <TIn, TOut> (IEnumerable <TIn> inputEnumerable, Func <TIn, Task <TOut>> asyncProcessor, int? MaxDegreeOfParallelism = null)'
Dogu Arslan
7

Niestety w .NET Framework brakuje najważniejszych kombinatorów do organizowania równoległych zadań asynchronicznych. Nie ma czegoś takiego wbudowanego.

Spójrz na klasę AsyncSemaphore zbudowaną przez najbardziej szanowanego Stephena Touba. To, czego chcesz, nazywa się semaforem i potrzebujesz jego asynchronicznej wersji.

usr
źródło
12
Zauważ, że „Niestety, w .NET Framework brakuje najważniejszych kombinatorów do organizowania równoległych zadań asynchronicznych. Nie ma czegoś takiego wbudowanego”. nie jest już poprawna od wersji .NET 4.5 Beta. SemaphoreSlim oferuje teraz funkcjonalność WaitAsync (...) :)
Theo Yaung
Czy SemaphoreSlim (z nowymi metodami asynchronicznymi) powinien być preferowany zamiast AsyncSemphore, czy też implementacja Toub nadal ma jakąś zaletę?
Todd Menier
Moim zdaniem powinien być preferowany typ wbudowany, ponieważ prawdopodobnie jest dobrze przetestowany i dobrze zaprojektowany.
usr
4
Stephen dodał komentarz w odpowiedzi na pytanie w swoim blogu potwierdzające, że użycie SemaphoreSlim dla .NET 4.5 byłoby ogólnie dobrym rozwiązaniem.
jdasilva
7

Istnieje wiele pułapek, a bezpośrednie użycie semafora może być trudne w przypadku błędów, dlatego sugerowałbym użycie AsyncEnumerator pakietu NuGet zamiast ponownego wymyślania koła:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);
Serge Semenov
źródło
4

Przykład Theo Yaunga jest fajny, ale istnieje wariant bez listy zadań oczekujących.

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount); 

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url)); 
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}
vitidev
źródło
4
Zauważ, że istnieje jedno niebezpieczeństwo stosowania tego podejścia - wszelkie wyjątki, które występują w ProccessUrllub jego podfunkcjach, zostaną w rzeczywistości zignorowane. Zostaną przechwycone do zadań, ale nie zostaną przeniesione z powrotem do pierwotnego rozmówcy Check(...). Osobiście dlatego nadal używam zadań i ich funkcji kombinatorów, takich jak WhenAlli WhenAny- aby uzyskać lepszą propagację błędów. :)
Theo Yaung
3

SemaphoreSlim może być tutaj bardzo pomocny. Oto utworzona przeze mnie metoda rozszerzenia.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Przykładowe użycie:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
źródło
0

Stare pytanie, nowa odpowiedź. @vitidev miał blok kodu, który został ponownie użyty w prawie nienaruszonym stanie w recenzowanym przeze mnie projekcie. Po rozmowie z kilkoma współpracownikami jeden z nich zapytał: „Dlaczego nie użyjesz po prostu wbudowanych metod OC?” ActionBlock wygląda tam na zwycięzcę. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx . Prawdopodobnie nie zmieni żadnego istniejącego kodu, ale na pewno spróbuje zaadoptować ten nuget i ponownie wykorzystać najlepszą praktykę pana Softy'ego w zakresie ograniczonego równoległości.

Brak zwrotów Brak zwrotów
źródło
0

Oto rozwiązanie, które wykorzystuje leniwy charakter LINQ. Jest funkcjonalnie równoważne z zaakceptowaną odpowiedzią ), ale używa zadań roboczych zamiast a SemaphoreSlim, zmniejszając w ten sposób ślad pamięci całej operacji. Na początku pozwólmy mu działać bez dławienia. Pierwszym krokiem jest przekonwertowanie naszych adresów URL na niezliczone zadania.

string[] urls =
{
    "https://stackoverflow.com",
    "https://superuser.com",
    "https://serverfault.com",
    "https://meta.stackexchange.com",
    // ...
};
var httpClient = new HttpClient();
var tasks = urls.Select(async (url) =>
{
    return (Url: url, Html: await httpClient.GetStringAsync(url));
});

Drugim krokiem jest wykonanie awaitwszystkich zadań jednocześnie przy użyciu Task.WhenAllmetody:

var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
    Console.WriteLine($"Url: {result.Url}, {result.Html.Length:#,0} chars");
}

Wynik:

Adres URL: https://stackoverflow.com , 105.574 znaków
Adres URL: https://superuser.com , 126.953 znaków
Adres URL: https://serverfault.com , 125.963 znaków
Adres URL: https://meta.stackexchange.com , 185.276 znaków
...

Wdrożenie Microsoft od Task.WhenAllmaterializuje natychmiast dostarczony przeliczalny do tablicy, powodując wszystkie zadania do rozruchów na raz. Nie chcemy tego, ponieważ chcemy ograniczyć liczbę współbieżnych operacji asynchronicznych. Musimy więc wdrożyć alternatywę WhenAll, która wyliczy nasze wyliczalne delikatnie i powoli. Zrobimy to, tworząc liczbę zadań roboczych (równych pożądanemu poziomowi współbieżności), a każde zadanie robocze będzie wyliczać jedno zadanie na raz, używając blokady, aby zapewnić, że każde zadanie adresu URL zostanie przetworzone tylko jednym zadaniem roboczym. Następnie awaitwykonujemy wszystkie zadania pracownika, a na koniec zwracamy wyniki. Oto realizacja:

public static async Task<T[]> WhenAll<T>(IEnumerable<Task<T>> tasks,
    int concurrencyLevel)
{
    if (tasks is ICollection<Task<T>>) throw new ArgumentException(
        "The enumerable should not be materialized.", nameof(tasks));
    var locker = new object();
    var results = new List<T>();
    var failed = false;
    using (var enumerator = tasks.GetEnumerator())
    {
        var workerTasks = Enumerable.Range(0, concurrencyLevel)
        .Select(async _ =>
        {
            try
            {
                while (true)
                {
                    Task<T> task;
                    int index;
                    lock (locker)
                    {
                        if (failed) break;
                        if (!enumerator.MoveNext()) break;
                        task = enumerator.Current;
                        index = results.Count;
                        results.Add(default); // Reserve space in the list
                    }
                    var result = await task.ConfigureAwait(false);
                    lock (locker) results[index] = result;
                }
            }
            catch (Exception)
            {
                lock (locker) failed = true;
                throw;
            }
        }).ToArray();
        await Task.WhenAll(workerTasks).ConfigureAwait(false);
    }
    lock (locker) return results.ToArray();
}

... a oto, co musimy zmienić w naszym początkowym kodzie, aby osiągnąć pożądane dławienie:

var results = await WhenAll(tasks, concurrencyLevel: 2);

Istnieje różnica w obsłudze wyjątków. Natywny Task.WhenAllczeka na zakończenie wszystkich zadań i agreguje wszystkie wyjątki. Powyższa implementacja kończy się natychmiast po zakończeniu pierwszego błędnego zadania.

Theodor Zoulias
źródło
Implementację AC # 8, która zwraca wartość, IAsyncEnumerable<T>można znaleźć tutaj .
Theodor Zoulias
-1

Chociaż 1000 zadań może zostać umieszczonych w kolejce bardzo szybko, biblioteka zadań równoległych może obsługiwać tylko zadania współbieżne równe liczbie rdzeni procesora w komputerze. Oznacza to, że jeśli masz maszynę czterordzeniową, tylko 4 zadania będą wykonywane w danym czasie (chyba że obniżysz MaxDegreeOfParallelism).

scottm
źródło
8
Tak, ale to nie dotyczy asynchronicznych operacji we / wy. Powyższy kod uruchomi ponad 1000 jednoczesnych pobrań, nawet jeśli działa w jednym wątku.
Grief Coder
Nie widziałem tam awaitsłowa kluczowego. Usunięcie tego powinno rozwiązać problem, prawda?
scottm
2
Biblioteka z pewnością może obsłużyć więcej zadań uruchomionych (ze Runningstatusem) jednocześnie niż liczba rdzeni. Będzie to szczególnie dotyczyło zadań związanych z We / Wy.
svick
@svick: tak. Czy wiesz, jak skutecznie kontrolować maksymalną liczbę jednoczesnych zadań TPL (nie wątków)?
Grief Coder
-1

W celu przyspieszenia operacji związanych z procesorem należy stosować obliczenia równoległe. Tutaj mówimy o operacjach związanych z I / O. Twoja implementacja powinna być całkowicie asynchroniczna , chyba że przytłaczasz zajęty pojedynczy rdzeń wielordzeniowego procesora.

EDYCJA Podoba mi się sugestia usr dotycząca użycia „asynchronicznego semafora”.

GregC
źródło
Słuszna uwaga! Chociaż każde zadanie tutaj będzie zawierało kod asynchroniczny i synchronizacyjny (strona pobierana asynchronicznie, a następnie przetwarzana w sposób synchroniczny). Próbuję rozprowadzić część synchronizacyjną kodu na procesory i jednocześnie ograniczyć ilość współbieżnych asynchronicznych operacji we / wy.
Grief Coder
Czemu? Ponieważ jednoczesne uruchamianie ponad 1000 żądań http może nie być zadaniem dobrze dopasowanym do przepustowości sieci użytkownika.
wydający
Rozszerzenia równoległe mogą być również używane jako sposób na multipleksowanie operacji we / wy bez konieczności ręcznego wdrażania czystego rozwiązania asynchronicznego. Co, zgadzam się, można by uznać za niedbałe, ale dopóki utrzymujesz ścisły limit liczby jednoczesnych operacji, prawdopodobnie nie będzie to zbytnio obciążać puli wątków.
Sean U
3
Nie sądzę, aby ta odpowiedź była odpowiedzią. Samo bycie asynchronicznym nie wystarczy: naprawdę chcemy dławić fizyczne IO w sposób nieblokujący.
usr
1
Hmm… nie jestem pewien, zgadzam się… kiedy pracujesz nad dużym projektem, jeśli jeden zbyt wielu programistów przyjmie ten pogląd, dostaniesz głodu, mimo że wkład każdego programisty w pojedynkę nie wystarczy, aby przechylić sprawę. Biorąc pod uwagę, że jest tylko jedna pula wątków, nawet jeśli traktujesz ją częściowo z szacunkiem ... jeśli wszyscy inni robią to samo, mogą wystąpić problemy. W związku z tym zawsze odradzam uruchamianie długich rzeczy w puli wątków.
wydający
-1

Użyj MaxDegreeOfParallelism, która jest opcją, którą możesz określić w Parallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
Sean U
źródło
4
Myślę, że to nie działa. GetStringAsync(url)jest przeznaczony do wywołania await. Jeśli sprawdzisz typ var html, to jest to a Task<string>, a nie wynik string.
Neal Ehardt
2
@NealEhardt ma rację. Parallel.ForEach(...)jest przeznaczony do równoległego uruchamiania bloków kodu synchronicznego (np. w różnych wątkach).
Theo Yaung
-1

Zasadniczo będziesz chciał utworzyć akcję lub zadanie dla każdego adresu URL, który chcesz trafić, umieścić je na liście, a następnie przetworzyć tę listę, ograniczając liczbę, którą można przetwarzać równolegle.

Mój post na blogu pokazuje, jak to zrobić zarówno za pomocą zadań, jak i akcji, oraz zawiera przykładowy projekt, który można pobrać i uruchomić, aby zobaczyć oba w akcji.

Z akcjami

Jeśli korzystasz z akcji, możesz użyć wbudowanej funkcji .Net Parallel.Invoke. Tutaj ograniczamy to do równoległego uruchamiania maksymalnie 20 wątków.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

Z zadaniami

W przypadku zadań nie ma wbudowanej funkcji. Możesz jednak skorzystać z tego, który udostępniam na moim blogu.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

A następnie tworząc listę zadań i wywołując funkcję w celu ich uruchomienia, powiedzmy maksymalnie 20 jednocześnie, możesz zrobić to:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
śmiertelny pies
źródło
Myślę, że właśnie określasz initialCount dla SemaphoreSlim i musisz określić drugi parametr, tj. MaxCount w konstruktorze SemaphoreSlim.
Jay Shah
Chcę, aby każda odpowiedź z każdego zadania była przetwarzana na listę. Jak mogę uzyskać zwrot wyniku lub odpowiedź
venkat
-1

nie jest to dobra praktyka, ponieważ zmienia globalną zmienną. nie jest to również ogólne rozwiązanie dla asynchronii. ale jest to łatwe dla wszystkich wystąpień HttpClient, jeśli to wszystko, czego szukasz. możesz po prostu spróbować:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;
symbiont
źródło