Używanie asynchronizacji / oczekiwania na wiele zadań

406

Korzystam z klienta API, który jest całkowicie asynchroniczny, to znaczy każda operacja albo zwraca, Taskalbo Task<T>np .:

static async Task DoSomething(int siteId, int postId, IBlogClient client)
{
    await client.DeletePost(siteId, postId); // call API client
    Console.WriteLine("Deleted post {0}.", siteId);
}

Używając operatorów asynchronicznych / oczekujących C # 5, jaki jest właściwy / najbardziej wydajny sposób uruchamiania wielu zadań i czekania na ich zakończenie:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

lub:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

Ponieważ klient API używa HttpClient wewnętrznie, spodziewam się, że natychmiast wyda 5 żądań HTTP, pisząc do konsoli po zakończeniu każdego z nich.

Ben Foster
źródło
A w czym jest problem?
Serg Szewczenko
1
@SergShevchenko Problem polega na tym, że jego Parallel.ForEach jest wykonywany niepoprawnie (patrz odpowiedzi) - pyta, czy jego próby równoległego uruchomienia kodu asynchronicznego są prawidłowe, oferując dwie próby rozwiązania, a jeśli jedna jest lepsza od drugiej (i prawdopodobnie dlaczego tak jest) ).
AnorZaken

Odpowiedzi:

572
int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

Chociaż operacje są uruchamiane równolegle z powyższym kodem, kod ten blokuje każdy wątek, na którym działa każda operacja. Na przykład, jeśli połączenie sieciowe trwa 2 sekundy, każdy wątek zawiesza się przez 2 sekundy bez robienia czegokolwiek poza czekaniem.

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

Z drugiej strony powyższy kod z WaitAllblokuje również wątki, a twoje wątki nie będą mogły przetwarzać żadnej innej pracy do czasu zakończenia operacji.

Zalecane podejście

Wolałbym, aby WhenAllTwoje operacje były wykonywane asynchronicznie równolegle.

public async Task DoWork() {

    int[] ids = new[] { 1, 2, 3, 4, 5 };
    await Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

W rzeczywistości w powyższym przypadku nawet nie musisz await, możesz po prostu bezpośrednio powrócić z metody, ponieważ nie masz żadnych kontynuacji:

public Task DoWork() 
{
    int[] ids = new[] { 1, 2, 3, 4, 5 };
    return Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

Aby to zrobić, oto szczegółowy post na blogu, w którym omówiono wszystkie alternatywy oraz ich zalety / wady: jak i gdzie współbieżne asynchroniczne operacje we / wy z interfejsem API sieci Web ASP.NET

tugberk
źródło
31
„powyższy kod z WaitAllblokuje również wątki” - czy nie blokuje tylko jednego wątku, tego, który wywołał WaitAll?
Rawling,
5
@Rawling dokumentacji stwierdza, że ​​„Typ: System.Threading.Tasks.Task [] Tablica instancji zadań, na które trzeba czekać.”. Blokuje więc wszystkie wątki.
Mixxiphoid
30
@Mixxiphoid: Podany przez Ciebie bit nie oznacza, że ​​blokuje on wszystkie wątki. Blokuje tylko wątek wywołujący podczas działania dostarczonych zadań. To, jak te zadania są faktycznie wykonywane, zależy od harmonogramu. Zazwyczaj po zakończeniu każdego zadania wątek, na którym był uruchomiony, byłby zwracany do puli. Każdy wątek nie pozostanie zablokowany, dopóki inne nie zostaną ukończone.
Musaul
3
@tugberk, Z tego, co rozumiem, jedyną różnicą między „klasycznymi” metodami zadań a odpowiednikami Async jest to, w jaki sposób wchodzą one w interakcje z wątkami między momentem rozpoczęcia zadania a jego zakończeniem. Klasyczna metoda w domyślnym terminarzu przerywa wątek w tym okresie (nawet jeśli „śpi”), podczas gdy asynchroniczne nie. Poza tym okresem nie ma różnicy, tzn. Zadanie jest zaplanowane, ale nie rozpoczęte, a kiedy jest zakończone, ale jego rozmówca nadal czeka.
Musaul
3
@tugberk Patrz stackoverflow.com/a/6123432/750216 różnica polega na tym, czy wątek wywołujący jest zablokowany, czy nie, reszta jest taka sama. Możesz zredagować odpowiedź w celu wyjaśnienia.
Răzvan Flavius ​​Panda
45

Byłem ciekawy wyników metod podanych w pytaniu, a także zaakceptowanej odpowiedzi, więc poddałem je próbie.

Oto kod:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public async Task DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart-testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd-workerStart).TotalSeconds.ToString("F2"), (workerEnd-testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart).Wait());
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWork(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWork(testStart)));
        }
    }
}

I wynikowy wynik:

Starting test: Parallel.ForEach...
Worker 1 started on thread 1, beginning 0.21 seconds after test start.
Worker 4 started on thread 5, beginning 0.21 seconds after test start.
Worker 2 started on thread 3, beginning 0.21 seconds after test start.
Worker 5 started on thread 6, beginning 0.21 seconds after test start.
Worker 3 started on thread 4, beginning 0.21 seconds after test start.
Worker 1 stopped; the worker took 1.90 seconds, and it finished 2.11 seconds after the test start.
Worker 2 stopped; the worker took 3.89 seconds, and it finished 4.10 seconds after the test start.
Worker 3 stopped; the worker took 5.89 seconds, and it finished 6.10 seconds after the test start.
Worker 4 stopped; the worker took 5.90 seconds, and it finished 6.11 seconds after the test start.
Worker 5 stopped; the worker took 8.89 seconds, and it finished 9.10 seconds after the test start.
Test finished after 9.10 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 1, beginning 0.01 seconds after test start.
Worker 2 started on thread 1, beginning 0.01 seconds after test start.
Worker 3 started on thread 1, beginning 0.01 seconds after test start.
Worker 4 started on thread 1, beginning 0.01 seconds after test start.
Worker 5 started on thread 1, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 1, beginning 0.00 seconds after test start.
Worker 2 started on thread 1, beginning 0.00 seconds after test start.
Worker 3 started on thread 1, beginning 0.00 seconds after test start.
Worker 4 started on thread 1, beginning 0.00 seconds after test start.
Worker 5 started on thread 1, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.00 seconds after the test start.
Test finished after 5.00 seconds.
RiaanDP
źródło
2
Jeśli poświęcisz czas na każdy z tych wyników, byłoby to bardziej przydatne
Serj Sagan
8
@SerjSagan moim początkowym pomysłem było sprawdzenie, czy pracownicy są uruchamiani jednocześnie w każdym przypadku, ale dodałem znaczniki czasu, aby poprawić przejrzystość testu. Dzieki za sugestie.
RiaanDP
Dziękuję za test. Jednak wydaje się nieco dziwne, że używasz wątku. Śpij w wątku innym niż „wątek roboczy”. W tym przypadku nie ma to znaczenia, ale czy nie miałoby to większego sensu w zadaniu. Uruchomienie wątków roboczych, jeśli symulujemy pracę obliczeniową, lub po prostu zadanie. Opóźnienie zamiast snu, jeśli symulujemy operacje we / wy? Sprawdzam tylko, co o tym myślisz.
AnorZaken
24

Ponieważ wywoływany interfejs API jest asynchroniczny, Parallel.ForEachwersja nie ma większego sensu. Nie powinieneś używać .Waittej WaitAllwersji, ponieważ straciłoby to równoległość. Inna alternatywa, jeśli wywołujący używa asynchronii, korzysta Task.WhenAllpo wykonaniu Selecti ToArraywygenerowaniu tablicy zadań. Drugą alternatywą jest użycie Rx 2.0

James Manning
źródło
10

Możesz użyć Task.WhenAllfunkcji, którą możesz przekazać n zadań; Task.WhenAllzwróci zadanie, które zostanie ukończone, gdy wszystkie zadania, które przekazałeś do Task.WhenAllwykonania. Musisz czekać asynchronicznie Task.WhenAll, aby nie zablokować wątku interfejsu użytkownika:

   public async Task DoSomeThing() {

       var Task[] tasks = new Task[numTasks];
       for(int i = 0; i < numTask; i++)
       {
          tasks[i] = CallSomeAsync();
       }
       await Task.WhenAll(tasks);
       // code that'll execute on UI thread
   }
Ahmed Wasim
źródło
8

Parallel.ForEachwymaga listy pracowników zdefiniowanych przez użytkownika i nie asynchronizacji Action z każdym pracownikiem.

Task.WaitAlli Task.WhenAllwymagająList<Task> , które z definicji są asynchroniczne.

Znalazłem RiaanDP „s odpowiedź bardzo przydatna, aby zrozumieć różnicę, ale to wymaga korekty o Parallel.ForEach. Brak wystarczającej reputacji, aby odpowiedzieć na jego komentarz, a więc moja własna odpowiedź.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public void DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                Thread.Sleep(SleepTimeout);
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }

            public async Task DoWorkAsync(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart));
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWorkAsync(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWorkAsync(testStart)));
        }
    }
}

Wynikowy wynik jest poniżej. Czasy wykonania są porównywalne. Przeprowadziłem ten test, gdy mój komputer wykonywał cotygodniowe skanowanie antywirusowe. Zmiana kolejności testów zmieniła czasy ich wykonywania.

Starting test: Parallel.ForEach...
Worker 1 started on thread 9, beginning 0.02 seconds after test start.
Worker 2 started on thread 10, beginning 0.02 seconds after test start.
Worker 3 started on thread 11, beginning 0.02 seconds after test start.
Worker 4 started on thread 13, beginning 0.03 seconds after test start.
Worker 5 started on thread 14, beginning 0.03 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.02 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.02 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.03 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.03 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.03 seconds after the test start.
Test finished after 5.03 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.
JPortillo
źródło