Równoległe foreach z asynchroniczną lambdą

150

Chciałbym obsługiwać kolekcję równolegle, ale mam problemy z jej wdrożeniem i dlatego liczę na pomoc.

Problem pojawia się, jeśli chcę wywołać metodę oznaczoną jako asynchroniczna w C #, w obrębie lambda pętli równoległej. Na przykład:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Problem występuje, gdy liczba wynosi 0, ponieważ wszystkie utworzone wątki są w rzeczywistości tylko wątkami w tle, a Parallel.ForEachwywołanie nie czeka na zakończenie. Jeśli usunę słowo kluczowe async, metoda wygląda następująco:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Działa, ale całkowicie wyłącza oczekiwaną spryt i muszę wykonać ręczną obsługę wyjątków ... (Usunięto dla zwięzłości).

Jak mogę zaimplementować Parallel.ForEachpętlę, która używa słowa kluczowego await w lambdzie? Czy to możliwe?

Prototyp metody Parallel.ForEach przyjmuje Action<T>parametr as, ale chcę, aby czekał na moją asynchroniczną lambdę.

clausndk
źródło
1
Zakładam, że chciałeś usunąć awaitz await GetData(item)drugiego bloku kodu, ponieważ spowodowałoby to błąd kompilacji w obecnej postaci.
Josh M.
2
Możliwy duplikat zagnieżdżenia czeka w trybie równoległym.
ForEach

Odpowiedzi:

204

Jeśli chcesz tylko prostego równoległości, możesz to zrobić:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

Jeśli potrzebujesz czegoś bardziej skomplikowanego, sprawdź Stephena Toub za ForEachAsyncposta .

Stephen Cleary
źródło
50
Prawdopodobnie potrzebny jest mechanizm dławiący. Spowoduje to natychmiastowe utworzenie tylu zadań, ile jest elementów, które mogą zakończyć się 10 tys. Żądań sieciowych i tym podobnych.
usr
10
@usr Ostatni przykład w artykule Stephena Tubusa dotyczy tego.
svick
@svick Zastanawiałem się nad ostatnią próbką. Wydaje mi się, że po prostu grupuje mnóstwo zadań, aby utworzyć dla mnie więcej zadań, ale wszystkie zaczynają się masowo.
Luke Puplett
2
@LukePuplett Tworzy dopzadania, a każdy z nich przetwarza szeregowo pewien podzbiór kolekcji wejściowej.
Svick
4
@Afshin_Zavvar: Jeśli wywołasz Task.Runbez awaitwyniku, to po prostu wrzucisz pracę „uruchom i zapomnij” do puli wątków. To prawie zawsze błąd.
Stephen Cleary
83

Można użyć ParallelForEachAsyncmetody rozszerzenia z pakietu AsyncEnumerator NuGet :

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Serge Semenov
źródło
1
To jest twoja paczka? Widziałem, jak publikowałeś to teraz w kilku miejscach? : D Aha… Twoje imię jest na przesyłce: D +1
Piotr Kula
20
@ppumkin, tak, jest moje. Widziałem ten problem w kółko, więc zdecydowaliśmy się go rozwiązać w najprostszy sposób możliwe i bezpłatne innym walczy jak dobrze :)
Serge Semenov
Dzięki… to zdecydowanie ma sens i bardzo mi pomogło!
Piotr Kula
2
masz literówkę: maxDegreeOfParallelism>maxDegreeOfParalellism
Shiran Dror
3
Prawidłowa pisownia to rzeczywiście maxDegreeOfParallelism, ale jest coś w komentarzu @ ShiranDror - w swoim pakiecie przez pomyłkę nazwałaś zmienną maxDegreeOfParalellism (i dlatego Twój cytowany kod nie skompiluje się, dopóki go nie zmienisz ..)
BornToCode
19

Dzięki temu SemaphoreSlimmożna uzyskać kontrolę równoległości.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  try
  {
     await throttler.WaitAsync();
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;
Felipe l
źródło
SemaphoreSlim powinien być opakowany usingoświadczeniem, ponieważ implementuje IDisposable
Sal
4

Moja lekka implementacja Async ParallelForEach.

Cechy:

  1. Ograniczanie (maksymalny stopień równoległości).
  2. Obsługa wyjątków (wyjątek agregacji zostanie zgłoszony po zakończeniu).
  3. Wydajna pamięć (nie ma potrzeby przechowywania listy zadań).

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Przykład użycia:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
nicolay.anykienko
źródło
tcs.SetResult(null)trzeba wymienić na tcs.TrySetResult(null)
Hocas
@Hocas, jak myślisz, dlaczego TrySetResult jest potrzebny?
nicolay.anykienko
Miałem problem z wieloma wywołaniami, SetResultkiedy ostatnio użyłem tego kodu) Kiedy używać SetResult () vs TrySetResult ()
Hocas
@Hocas, to interesujące. Nie oczekuje się dwukrotnego wykonania tcs.SetResult (null).
nicolay.anykienko
Używanie CurrentCountwłaściwości the SemaphoreSlimdo kontrolowania przepływu wykonania nie jest dobrym pomysłem. W większości przypadków stwarza warunki wyścigu. Używanie Volatile.Readjest również chwiejne (inny możliwy stan wyścigu). Nie ufałbym temu rozwiązaniu w środowisku produkcyjnym.
Theodor Zoulias
2

Stworzyłem do tego metodę rozszerzającą, która wykorzystuje SemaphoreSlim, a także pozwala ustawić maksymalny stopień równoległości

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Przykładowe użycie:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
źródło
„używanie” nie pomoże. pętla foreach będzie czekała na semafon w nieskończoność. Po prostu wypróbuj ten prosty kod, który odtwarza problem: await Enumerable.Range (1, 4) .ForEachAsyncConcurrent (async (i) => {Console.WriteLine (i); wyrzuć nowy wyjątek ("testowy wyjątek");}, maxDegreeOfParallelism: 2);
nicolay.anykienko
@ nicolay.anykienko masz rację co do # 2. Ten problem z pamięcią można rozwiązać, dodając taskWithThrottler.RemoveAll (x => x.IsCompleted);
askids
1
Wypróbowałem to w moim kodzie i jeśli maxDegreeOfParallelism nie jest zerowe, kod jest zakleszczony. Tutaj możesz zobaczyć cały kod do odtworzenia: stackoverflow.com/questions/58793118/…
Massimo Savazzi