Chciałbym obsługiwać kolekcję równolegle, ale mam problemy z jej wdrożeniem i dlatego liczę na pomoc.
Problem pojawia się, jeśli chcę wywołać metodę oznaczoną jako asynchroniczna w C #, w obrębie lambda pętli równoległej. Na przykład:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Problem występuje, gdy liczba wynosi 0, ponieważ wszystkie utworzone wątki są w rzeczywistości tylko wątkami w tle, a Parallel.ForEach
wywołanie nie czeka na zakończenie. Jeśli usunę słowo kluczowe async, metoda wygląda następująco:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Działa, ale całkowicie wyłącza oczekiwaną spryt i muszę wykonać ręczną obsługę wyjątków ... (Usunięto dla zwięzłości).
Jak mogę zaimplementować Parallel.ForEach
pętlę, która używa słowa kluczowego await w lambdzie? Czy to możliwe?
Prototyp metody Parallel.ForEach przyjmuje Action<T>
parametr as, ale chcę, aby czekał na moją asynchroniczną lambdę.
await
zawait GetData(item)
drugiego bloku kodu, ponieważ spowodowałoby to błąd kompilacji w obecnej postaci.Odpowiedzi:
Jeśli chcesz tylko prostego równoległości, możesz to zrobić:
var bag = new ConcurrentBag<object>(); var tasks = myCollection.Select(async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff }); await Task.WhenAll(tasks); var count = bag.Count;
Jeśli potrzebujesz czegoś bardziej skomplikowanego, sprawdź Stephena Toub za
ForEachAsync
posta .źródło
dop
zadania, a każdy z nich przetwarza szeregowo pewien podzbiór kolekcji wejściowej.Task.Run
bezawait
wyniku, to po prostu wrzucisz pracę „uruchom i zapomnij” do puli wątków. To prawie zawsze błąd.Można użyć
ParallelForEachAsync
metody rozszerzenia z pakietu AsyncEnumerator NuGet :using Dasync.Collections; var bag = new ConcurrentBag<object>(); await myCollection.ParallelForEachAsync(async item => { // some pre stuff var response = await GetData(item); bag.Add(response); // some post stuff }, maxDegreeOfParallelism: 10); var count = bag.Count;
źródło
maxDegreeOfParallelism
>maxDegreeOfParalellism
Dzięki temu
SemaphoreSlim
można uzyskać kontrolę równoległości.var bag = new ConcurrentBag<object>(); var maxParallel = 20; var throttler = new SemaphoreSlim(initialCount: maxParallel); var tasks = myCollection.Select(async item => { try { await throttler.WaitAsync(); var response = await GetData(item); bag.Add(response); } finally { throttler.Release(); } }); await Task.WhenAll(tasks); var count = bag.Count;
źródło
using
oświadczeniem, ponieważ implementuje IDisposableMoja lekka implementacja Async ParallelForEach.
Cechy:
public static class AsyncEx { public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10) { var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism); var tcs = new TaskCompletionSource<object>(); var exceptions = new ConcurrentBag<Exception>(); bool addingCompleted = false; foreach (T item in source) { await semaphoreSlim.WaitAsync(); asyncAction(item).ContinueWith(t => { semaphoreSlim.Release(); if (t.Exception != null) { exceptions.Add(t.Exception); } if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism) { tcs.TrySetResult(null); } }); } Volatile.Write(ref addingCompleted, true); await tcs.Task; if (exceptions.Count > 0) { throw new AggregateException(exceptions); } } }
Przykład użycia:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) => { var data = await GetData(i); }, maxDegreeOfParallelism: 100);
źródło
tcs.SetResult(null)
trzeba wymienić natcs.TrySetResult(null)
SetResult
kiedy ostatnio użyłem tego kodu) Kiedy używać SetResult () vs TrySetResult ()CurrentCount
właściwości theSemaphoreSlim
do kontrolowania przepływu wykonania nie jest dobrym pomysłem. W większości przypadków stwarza warunki wyścigu. UżywanieVolatile.Read
jest również chwiejne (inny możliwy stan wyścigu). Nie ufałbym temu rozwiązaniu w środowisku produkcyjnym.Stworzyłem do tego metodę rozszerzającą, która wykorzystuje SemaphoreSlim, a także pozwala ustawić maksymalny stopień równoległości
/// <summary> /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/> /// </summary> /// <typeparam name="T">Type of IEnumerable</typeparam> /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param> /// <param name="action">an async <see cref="Action" /> to execute</param> /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism, /// Must be grater than 0</param> /// <returns>A Task representing an async operation</returns> /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception> public static async Task ForEachAsyncConcurrent<T>( this IEnumerable<T> enumerable, Func<T, Task> action, int? maxDegreeOfParallelism = null) { if (maxDegreeOfParallelism.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value)) { var tasksWithThrottler = new List<Task>(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item).ContinueWith(res => { // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); }); })); } // Wait for all tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } }
Przykładowe użycie:
await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);
źródło