Chciałbym await
na wyniku BlockingCollection<T>.Take()
asynchronicznie więc nie blokować wątku. Szukasz czegoś takiego:
var item = await blockingCollection.TakeAsync();
Wiem, że mógłbym to zrobić:
var item = await Task.Run(() => blockingCollection.Take());
ale to trochę zabija cały pomysł, ponieważ ThreadPool
zamiast tego blokowany jest inny wątek (of ).
Czy jest jakaś alternatywa?
await Task.Run(() => blockingCollection.Take())
zadania, zostanie ono wykonane w innym wątku, a Twój wątek interfejsu użytkownika nie zostanie zablokowany, czy nie o to chodzi?Task
API. Może być używany na przykład z poziomu ASP.NET. Kod, o którym mowa, nie byłby tam dobrze skalowany.ConfigureAwait
został użyty poRun()
? [red. nieważne, teraz rozumiem, o czym mówisz]Odpowiedzi:
Znam cztery alternatywy.
Pierwsza z nich to kanały , która zapewnia bezpieczną wątkowo kolejkę, która obsługuje operacje asynchroniczne
Read
iWrite
operacje. Kanały są wysoce zoptymalizowane i opcjonalnie obsługują upuszczanie niektórych elementów po osiągnięciu progu.Następny pochodzi
BufferBlock<T>
z TPL Dataflow . Jeśli masz tylko jednego konsumenta, możesz użyćOutputAvailableAsync
lubReceiveAsync
lub po prostu połączyć go z plikiemActionBlock<T>
. Więcej informacji znajdziesz na moim blogu .Ostatnie dwa to typy, które utworzyłem, dostępne w mojej bibliotece AsyncEx .
AsyncCollection<T>
jestasync
prawie odpowiednikiemBlockingCollection<T>
, zdolnym do pakowania równoległej kolekcji producenta / konsumenta, takiej jakConcurrentQueue<T>
lubConcurrentBag<T>
. Możesz użyćTakeAsync
do asynchronicznego korzystania z elementów z kolekcji. Więcej informacji znajdziesz na moim blogu .AsyncProducerConsumerQueue<T>
jestasync
kolejką producent / konsument bardziej kompatybilną z przenośnymi . Możesz użyćDequeueAsync
do asynchronicznego zużywania elementów z kolejki. Więcej informacji znajdziesz na moim blogu .Ostatnie trzy z tych alternatyw umożliwiają synchroniczne i asynchroniczne operacje typu put and take.
źródło
AsyncCollection.TryTakeAsync
, ale nie mogę jej znaleźć w pobranejNito.AsyncEx.Coordination.dll 5.0.0.0
(najnowszej wersji). Plik Nito.AsyncEx.Concurrent.dll, do którego się odwołuje , nie istnieje w pakiecie . czego mi brakuje?while ((result = await collection.TryTakeAsync()).Success) { }
. Dlaczego został usunięty?... lub możesz to zrobić:
using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } }
Prosta, w pełni funkcjonalna asynchroniczna kolejka FIFO.
źródło
for
? jeśli semafor zostanie zwolniony, kolejka ma co najmniej jedną pozycję do usunięcia z kolejki, nie?TryDequeue
jest, zwraca wartość lub w ogóle nie zwraca. Technicznie rzecz biorąc, jeśli masz więcej niż 1 czytnik, ten sam czytelnik może skonsumować dwie (lub więcej) pozycje, zanim jakikolwiek inny czytelnik całkowicie się obudzi. SukcesWaitAsync
to tylko sygnał, że w kolejce mogą znajdować się przedmioty do spożycia, nie jest to gwarancja.If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.
z docs.microsoft.com/en-us/dotnet/api/… W jaki sposób udaneWaitAsync
nie ma elementów w kolejce? Jeśli uwolnienie N budzi więcej niż N konsumentów,semaphore
to jest zepsute. Prawda?Oto bardzo podstawowa implementacja,
BlockingCollection
która obsługuje oczekiwanie, z wieloma brakującymi funkcjami. Korzysta zAsyncEnumerable
biblioteki, która umożliwia asynchroniczne wyliczanie dla wersji C # starszych niż 8,0.public class AsyncBlockingCollection<T> { // Missing features: cancellation, boundedCapacity, TakeAsync private Queue<T> _queue = new Queue<T>(); private SemaphoreSlim _semaphore = new SemaphoreSlim(0); private int _consumersCount = 0; private bool _isAddingCompleted; public void Add(T item) { lock (_queue) { if (_isAddingCompleted) throw new InvalidOperationException(); _queue.Enqueue(item); } _semaphore.Release(); } public void CompleteAdding() { lock (_queue) { if (_isAddingCompleted) return; _isAddingCompleted = true; if (_consumersCount > 0) _semaphore.Release(_consumersCount); } } public IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; return new AsyncEnumerable<T>(async yield => { while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) await yield.ReturnAsync(item); } }); } }
Przykład użycia:
var abc = new AsyncBlockingCollection<int>(); var producer = Task.Run(async () => { for (int i = 1; i <= 10; i++) { await Task.Delay(100); abc.Add(i); } abc.CompleteAdding(); }); var consumer = Task.Run(async () => { await abc.GetConsumingEnumerable().ForEachAsync(async item => { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); }); }); await Task.WhenAll(producer, consumer);
Wynik:
Aktualizacja: wraz z wydaniem języka C # 8 asynchroniczne wyliczanie stało się funkcją języka wbudowanego. Wymagane klasy (
IAsyncEnumerable
,IAsyncEnumerator
) są osadzone w .NET Core 3.0 i są oferowane jako pakiet dla .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ).Oto alternatywna
GetConsumingEnumerable
implementacja z nową składnią C # 8:public async IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) yield return item; } }
Zwróć uwagę na współistnienie
await
iyield
w tej samej metodzie.Przykład użycia (C # 8):
var consumer = Task.Run(async () => { await foreach (var item in abc.GetConsumingEnumerable()) { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); } });
Zwróć uwagę na
await
przedforeach
.źródło
AsyncBlockingCollection
jest bezsensowna. Coś nie może być jednocześnie asynchroniczne i blokujące, ponieważ te dwie koncepcje są dokładnymi przeciwieństwami!Jeśli nie masz nic przeciwko drobnemu włamaniu, możesz wypróbować te rozszerzenia.
public static async Task AddAsync<TEntity>( this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt) { while (true) { try { if (Bc.TryAdd(item, 0, abortCt)) return; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } public static async Task<TEntity> TakeAsync<TEntity>( this BlockingCollection<TEntity> Bc, CancellationToken abortCt) { while (true) { try { TEntity item; if (Bc.TryTake(out item, 0, abortCt)) return item; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } }
źródło