Tworzenie blokującej kolejki Queue <T> w .NET?

163

Mam scenariusz, w którym mam wiele wątków dodających do kolejki i wiele wątków odczytujących z tej samej kolejki. Jeśli kolejka osiągnie określony rozmiar, wszystkie wątki , które wypełniają kolejkę, zostaną zablokowane przy dodawaniu, dopóki element nie zostanie usunięty z kolejki.

Poniższe rozwiązanie jest tym, czego teraz używam, a moje pytanie brzmi: Jak można to poprawić? Czy istnieje obiekt, który już włącza to zachowanie w BCL, którego powinienem używać?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Eric Schoonover
źródło
5
.Net jak ma wbudowane klasy, które pomogą w tym scenariuszu. Większość wymienionych tutaj odpowiedzi jest nieaktualna. Zobacz najnowsze odpowiedzi na dole. Przejrzyj kolekcje blokujące bezpieczne wątkowo. Odpowiedzi mogą być nieaktualne, ale to wciąż dobre pytanie!
Tom A,
Myślę, że nadal dobrym pomysłem jest poznanie Monitor.Wait / Pulse / PulseAll, nawet jeśli mamy nowe współbieżne klasy w .NET.
thewpfguy
1
Zgadzam się z @thewpfguy. Będziesz chciał zrozumieć podstawowe mechanizmy blokowania za kulisami. Warto również zauważyć, że Systems.Collections.Concurrent nie istniał do kwietnia 2010 roku, a potem tylko w Visual Studio 2010 i nowszych. Zdecydowanie nie ma opcji dla VS2008 wstrzymujących się ...
Vic
Jeśli czytasz to teraz, spójrz na System.Threading.Channels dla implementacji tego z wieloma zapisami / wieloma czytelnikami, ograniczonej, opcjonalnie blokującej dla .NET Core i .NET Standard.
Mark Rendle

Odpowiedzi:

200

Wygląda to bardzo niebezpiecznie (bardzo mała synchronizacja); co powiesz na coś takiego:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edytować)

W rzeczywistości chciałbyś mieć sposób na zamknięcie kolejki, aby czytelnicy zaczęli czysto wychodzić - być może coś w rodzaju flagi bool - jeśli jest ustawiona, pusta kolejka po prostu wraca (zamiast blokować):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Marc Gravell
źródło
1
Co powiesz na zmianę czekania na WaitAny i przekazanie zakończenia czekania na budowę ...
Sam Saffron
1
@ Marc - optymalizacją, jeśli oczekujesz, że kolejka zawsze osiąga pojemność, byłoby przekazanie wartości maxSize do konstruktora Queue <T>. Możesz dodać innego konstruktora do swojej klasy, aby się do tego dostosować.
RichardOD
3
Dlaczego SizeQueue, dlaczego nie FixedSizeQueue?
mindless.panda
4
@Lasse - zwalnia blokadę (i) w trakcie Wait, aby inne wątki mogły ją zdobyć. Po przebudzeniu odzyskuje blokadę (i).
Marc Gravell
1
Fajnie, jak powiedziałem, było coś, czego nie rozumiałem :) To z pewnością sprawia, że ​​chcę ponownie odwiedzić część mojego kodu wątku ....
Lasse V. Karlsen
14

„Jak można to poprawić?”

Cóż, musisz przyjrzeć się każdej metodzie w swojej klasie i zastanowić się, co by się stało, gdyby inny wątek jednocześnie wywoływał tę metodę lub inną metodę. Na przykład blokada jest umieszczana w metodzie Remove, ale nie w metodzie Add. Co się stanie, jeśli jeden wątek zostanie dodany w tym samym czasie co inny wątek zostanie usunięty? Złe rzeczy.

Weź również pod uwagę, że metoda może zwrócić drugi obiekt, który zapewnia dostęp do wewnętrznych danych pierwszego obiektu - na przykład GetEnumerator. Wyobraź sobie, że jeden wątek przechodzi przez ten moduł wyliczający, a inny wątek modyfikuje listę w tym samym czasie. Niedobrze.

Dobrą praktyczną zasadą jest uproszczenie tego rozwiązania poprzez zmniejszenie liczby metod w klasie do absolutnego minimum.

W szczególności nie dziedzicz innej klasy kontenera, ponieważ ujawnisz wszystkie metody tej klasy, umożliwiając wywołującemu uszkodzenie wewnętrznych danych lub zobaczenie częściowo kompletnych zmian w danych (tak samo źle, ponieważ dane wydaje się w tym momencie uszkodzony). Ukryj wszystkie szczegóły i bądź całkowicie bezlitosny w kwestii tego, jak udzielasz do nich dostępu.

Zdecydowanie radzę korzystać z gotowych rozwiązań - zdobądź książkę o wątkach lub skorzystaj z biblioteki innej firmy. W przeciwnym razie, biorąc pod uwagę to, czego próbujesz, będziesz debugować swój kod przez długi czas.

Poza tym, czy nie miałoby większego sensu, gdyby funkcja Remove zwracała element (powiedzmy, ten, który został dodany jako pierwszy, ponieważ jest to kolejka), zamiast wybierania przez dzwoniącego określonego elementu? A kiedy kolejka jest pusta, być może Usuń również powinno się zablokować.

Aktualizacja: odpowiedź Marca faktycznie uwzględnia wszystkie te sugestie! :) Ale zostawię to tutaj, ponieważ może być pomocne zrozumienie, dlaczego jego wersja jest tak ulepszona.

Daniel Earwicker
źródło
12

Możesz użyć BlockingCollection i ConcurrentQueue w przestrzeni nazw System.Collections.Concurrent

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
Andreas
źródło
3
BlockingCollection domyślnie Queue. Więc nie sądzę, żeby to było konieczne.
Curtis White
Czy BlockingCollection zachowuje porządkowanie jak kolejka?
joelc
Tak, kiedy jest zainicjowany przez ConcurrentQueue
Andreas
6

Właśnie podrzuciłem to za pomocą Reactive Extensions i przypomniałem sobie to pytanie:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Niekoniecznie całkowicie bezpieczne, ale bardzo proste.

Mark Rendle
źródło
Co to jest Subject <t>? Nie mam żadnego programu rozpoznawania nazw dla jego przestrzeni nazw.
Jerm
To część rozszerzeń reaktywnych.
Mark Rendle
Nie ma odpowiedzi. To wcale nie odpowiada na pytanie.
makhdumi
5

To jest to, co przyszedłem do bezpiecznej, ograniczonej kolejki blokowania wątków.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
Kevin
źródło
Czy możesz podać przykłady kodu, w jaki sposób ustawiłbym w kolejce niektóre funkcje wątku przy użyciu tej biblioteki, w tym sposób utworzenia wystąpienia tej klasy?
Jerm
To pytanie / odpowiedź jest nieco przestarzałe. Powinieneś przyjrzeć się przestrzeni nazw System.Collections.Concurrent, aby uzyskać blokowanie obsługi kolejek.
Kevin
2

Nie do końca poznałem TPL, ale mogą mieć coś, co pasuje do twoich potrzeb lub przynajmniej trochę pożywienia Reflector, z którego można czerpać inspirację.

Mam nadzieję, że to pomoże.

The MissingLINQ
źródło
Zdaję sobie sprawę, że to jest stare, ale mój komentarz jest dla nowicjuszy w SO, ponieważ OP już o tym wie. To nie jest odpowiedź, to powinien być komentarz.
John Demetriou
0

Cóż, możesz spojrzeć na System.Threading.Semaphoreklasę. Poza tym - nie, musisz to zrobić sam. AFAIK nie ma takiej wbudowanej kolekcji.

Vilx-
źródło
Patrzyłem na to pod kątem ograniczania liczby wątków, które uzyskują dostęp do zasobu, ale nie pozwala to na zablokowanie całego dostępu do zasobu na podstawie jakiegoś warunku (np. Collection.Count). AFAIK w każdym razie
Eric Schoonover
Cóż, robisz tę część sam, tak jak teraz. Po prostu zamiast MaxSize i _FullEvent masz Semaphore, który inicjujesz z odpowiednią liczbą w konstruktorze. Następnie, po każdym dodaniu / usunięciu wywołania WaitForOne () lub Release ().
Vilx
Nie różni się zbytnio od tego, co masz teraz. Po prostu prostsze IMHO.
Vilx
Czy możesz podać przykład pokazujący, że to działa? Nie widziałem, jak dynamicznie dostosować rozmiar semafora, czego wymaga ten scenariusz. Ponieważ musisz mieć możliwość blokowania wszystkich zasobów tylko wtedy, gdy kolejka jest pełna.
Eric Schoonover
Ach, zmiana rozmiaru! Dlaczego nie powiedziałeś tego od razu? OK, więc semafor nie jest dla ciebie. Powodzenia w takim podejściu!
Vilx
-1

Jeśli chcesz mieć maksymalną przepustowość, pozwalającą wielu czytelnikom na czytanie i tylko jednemu pisarzowi do pisania, BCL ma coś, co nazywa się ReaderWriterLockSlim, co powinno pomóc odchudzić twój kod ...

DavidN
źródło
Nie chcę jednak, aby ktokolwiek mógł pisać, jeśli kolejka jest pełna.
Eric Schoonover
Więc łączysz to z zamkiem. Oto kilka bardzo dobrych przykładów albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle albahari.com/threading/part4.aspx
DavidN
3
Dzięki queue / dequeue każdy jest pisarzem ... ekskluzywna blokada może być bardziej pragmatyczna
Marc Gravell
Zdaję sobie sprawę, że to jest stare, ale mój komentarz jest dla nowicjuszy w SO, ponieważ OP już o tym wie. To nie jest odpowiedź, to powinien być komentarz.
John Demetriou