Cała idea Parallel.ForEach()
polega na tym, że masz zestaw wątków, a każdy wątek przetwarza część kolekcji. Jak zauważyłeś, nie działa to z async
- await
, gdzie chcesz zwolnić wątek na czas trwania wywołania asynchronicznego.
Możesz to „naprawić”, blokując ForEach()
wątki, ale to niweczy cały punkt async
- await
.
Zamiast tego możesz użyć przepływu danych TPLParallel.ForEach()
, który Task
dobrze obsługuje asynchroniczne s.
Konkretnie, twój kod może być napisany przy użyciu, TransformBlock
który przekształca każdy identyfikator w Customer
przy użyciu async
lambda. Blok ten można skonfigurować do wykonywania równoległego. Łączyłbyś ten blok z pismem, ActionBlock
który zapisuje każdy Customer
do konsoli. Po skonfigurowaniu sieci bloków możesz Post()
przypisać każdy identyfikator do TransformBlock
.
W kodzie:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Chociaż prawdopodobnie chcesz ograniczyć paralelizm tej TransformBlock
małej stałej. Ponadto możesz ograniczyć pojemność TransformBlock
i dodać do niej elementy asynchronicznie SendAsync()
, na przykład, jeśli kolekcja jest zbyt duża.
Dodatkową korzyścią w porównaniu z twoim kodem (jeśli zadziałał) jest to, że pisanie rozpocznie się, gdy tylko pojedynczy element zostanie zakończony, i nie czekaj, aż całe przetwarzanie zostanie zakończone.
Parallel.ForEach()
zPost()
przedmiotów równolegle nie powinno mieć żadnego rzeczywistego efektu.Odpowiedź Svicka jest (jak zwykle) doskonała.
Uważam jednak, że przepływ danych jest bardziej przydatny, gdy faktycznie masz duże ilości danych do przesłania. Lub gdy potrzebujesz
async
kolejki kompatybilnej.W twoim przypadku prostszym rozwiązaniem jest po prostu użycie
async
równoległości stylu:źródło
Parallel.ForEach()
). Ale myślę, że obecnie jest to najlepsza opcja do wykonywania prawie każdejasync
pracy z kolekcjami.ParallelOptions
pomożesz? Ma zastosowanie tylko do tychParallel.For/ForEach/Invoke
, które zgodnie z ustalonym PO nie mają tu zastosowania.GetCustomer
metoda zwraca aTask<T>
, Czy należy jej używaćSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
nie obsługujeasync
.Używanie DataFlow zgodnie z sugestią Svicka może być przesadą, a odpowiedź Stephena nie zapewnia środków do kontrolowania współbieżności operacji. Można to jednak osiągnąć po prostu:
Te
ToArray()
połączenia mogą być optymalizowane przy użyciu tablicę zamiast listy i zastąpienie wykonanych zadań, ale wątpię, by to zrobić wielkiej różnicy w większości scenariuszy. Przykładowe użycie według pytania PO:EDIT Inny użytkownik SO i kreator TPL, Eli Arbel, wskazał mi powiązany artykuł Stephena Touba . Jak zwykle jego implementacja jest zarówno elegancka, jak i wydajna:
źródło
Partitioner.Create
wykorzystuje partycjonowanie porcji, które zapewnia elementy dynamicznie do różnych zadań, więc opisany scenariusz się nie wydarzy . Należy również pamiętać, że statyczne (wcześniej określone) partycjonowanie może być szybsze w niektórych przypadkach ze względu na mniejszy narzut (szczególnie synchronizację). Aby uzyskać więcej informacji, zobacz: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) będzie zawierać wyjątek (wewnątrz anAggregateException
), a w konsekwencji, jeśli wspomniany dzwoniący użyjeawait
, wyjątek zostanie zgłoszony w witrynie wywołującej. JednakTask.WhenAll
nadal będzie czekać na zakończenie wszystkich zadań iGetPartitions
dynamicznie przydziela elementy popartition.MoveNext
wywołaniu, dopóki nie pozostanie więcej elementów do przetworzenia. Oznacza to, że dopóki nie dodasz własnego mechanizmu zatrzymującego przetwarzanie (np.CancellationToken
), Nie stanie się to samo.var current = partition.Current
wcześniej,await body
a następnie użyćcurrent
w kontynuacji (ContinueWith(t => { ... }
).Możesz zaoszczędzić wysiłek dzięki nowemu pakietowi NuGet AsyncEnumerator , który nie istniał 4 lata temu, kiedy pytanie zostało pierwotnie opublikowane. Pozwala kontrolować stopień równoległości:
Oświadczenie: Jestem autorem biblioteki AsyncEnumerator, która jest oprogramowaniem typu open source i jest licencjonowana na MIT, i wysyłam tę wiadomość, aby pomóc społeczności.
źródło
AsyncStreams
i muszę powiedzieć, że jest doskonała. Nie mogę wystarczająco polecić tej biblioteki.Zawiń
Parallel.Foreach
w aTask.Run()
i zamiastawait
słowa kluczowego użyj[yourasyncmethod].Result
(musisz wykonać zadanie Task.Run, aby nie blokować wątku interfejsu użytkownika)
Coś takiego:
źródło
Parallel.ForEach
do pracy równoległej, która blokuje aż wszystkie są zrobione, a następnie wcisnąć całą rzecz do wątku tła mieć czułe UI. Masz z tym jakieś problemy? Może to za dużo jednego śpiącego wątku, ale jest to krótki, czytelny kod.Task.Run
kiedyTaskCompletionSource
jest to preferowane.TaskCompletionSource
lepszy?await
można przesunąć z przodu, aby zapisać nazwę dodatkowej zmiennej.Powinno to być dość wydajne i łatwiejsze niż uruchomienie całego przepływu danych TPL:
źródło
await
takich jak:var customers = await ids.SelectAsync(async i => { ... });
?Jestem trochę spóźniony na imprezę, ale możesz rozważyć użycie GetAwaiter.GetResult (), aby uruchomić kod asynchroniczny w kontekście synchronizacji, ale tak równolegle, jak poniżej;
źródło
Metoda rozszerzenia dla tego, która wykorzystuje SemaphoreSlim, a także pozwala ustawić maksymalny stopień równoległości
Przykładowe użycie:
źródło
Po wprowadzeniu szeregu metod pomocniczych będziesz mógł wykonywać równoległe zapytania za pomocą tej prostej składni:
Dzieje się tak: dzielimy kolekcję źródeł na 10 części (
.Split(DegreeOfParallelism)
), a następnie uruchamiamy 10 zadań, przetwarzając poszczególne elementy jeden po drugim (.SelectManyAsync(...)
) i scalamy je z powrotem w jedną listę.Warto wspomnieć, że istnieje prostsze podejście:
Ale wymaga to ostrożności : jeśli masz zbyt dużą kolekcję źródeł,
Task
od razu zaplanuje dla każdego elementu, co może spowodować znaczny spadek wydajności.Metody rozszerzenia stosowane w powyższych przykładach wyglądają następująco:
źródło