Kolejka o stałym rozmiarze, która automatycznie usuwa z kolejki stare wartości po nowych kodach

121

Używam ConcurrentQueuedo udostępnionej struktury danych, której celem jest przechowywanie ostatnich N przekazanych do niej obiektów (rodzaj historii).

Załóżmy, że mamy przeglądarkę i chcemy mieć 100 ostatnio przeglądanych adresów URL. Chcę kolejki, która automatycznie upuszcza (usuwa z kolejki) najstarszy (pierwszy) wpis po wstawieniu nowego wpisu (kolejce), gdy pojemność się zapełni (100 adresów w historii).

Jak mogę to osiągnąć za pomocą System.Collections?

Xaqron
źródło
Nie było to przeznaczone specjalnie dla Ciebie, ale dla każdego, kto napotka to pytanie i może uznać je za przydatne. btw, mówi też o C #. Czy udało Ci się przeczytać wszystkie odpowiedzi (w 2 minuty) i zorientować się, że nie ma tam kodu C #? Zresztą sam nie jestem pewien i stąd jest to komentarz ...
Możesz po prostu zawinąć metody w kłódkę. Biorąc pod uwagę, że są szybkie, możesz po prostu zablokować całą tablicę. Jest to prawdopodobnie oszustwo. Wyszukiwanie implementacji bufora cyklicznego z kodem C # może Cię znaleźć. W każdym razie, powodzenia.

Odpowiedzi:

111

Napisałbym klasę opakowującą, która w Enqueue sprawdzałaby Count, a następnie Dequeue, gdy liczba przekroczy limit.

 public class FixedSizedQueue<T>
 {
     ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }
Richard Schneider
źródło
4
qjest prywatny dla obiektu, więc lockuniemożliwi równoczesny dostęp innym wątkom.
Richard Schneider
14
Blokowanie nie jest dobrym pomysłem. Celem kolekcji współbieżnych BCL jest zapewnienie współbieżności bez blokad ze względu na wydajność. Blokowanie kodu naraża na szwank tę korzyść. W rzeczywistości nie widzę powodu, dla którego musisz blokować deq.
KFL,
2
@KFL, należy zablokować, ponieważ Counti TryDequeuesą to dwie niezależne operacje, które nie są synchronizowane przez BCL Concurrent.
Richard Schneider
9
@RichardSchneider Jeśli musisz samodzielnie poradzić sobie z problemami współbieżności, dobrym pomysłem byłoby zamiana ConcurrentQueue<T>obiektu na Queue<T>obiekt, który jest lżejszy.
0b101010
6
Nie definiuj własnej kolejki, po prostu użyj odziedziczonej. Jeśli zrobisz to, co robisz, nie możesz właściwie zrobić nic więcej z wartościami kolejki, wszystkimi innymi funkcjami, ale nowa Enqueuenadal będzie wywoływać oryginalną kolejkę. Innymi słowy, chociaż ta odpowiedź jest oznaczona jako zaakceptowana, jest całkowicie i całkowicie złamana.
Gábor
104

Wybrałbym niewielki wariant ... rozszerzyć ConcurrentQueue, aby móc używać rozszerzeń Linq w FixedSizeQueue

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}
Dave Lawrence
źródło
1
co się dzieje, gdy ktoś statycznie zna instancję jako ConcurrentQueue <T>, właśnie ominął Twoje słowo kluczowe „nowe”.
mhand
6
@mhand Jeśli „ktoś” chciał to zrobić; wtedy wybraliby na początek obiekt ConcurrentQueue <T> ... To jest niestandardowa klasa pamięci. Nikt nie chce, aby to zostało przesłane do platformy .NET. Próbowałeś stworzyć problem ze względu na to.
Dave Lawrence,
9
Chodzi mi o to, że zamiast podklasy można po prostu zawinąć kolejkę ... to wymusza pożądane zachowanie we wszystkich przypadkach. Ponadto, ponieważ jest to niestandardowa klasa pamięci, zróbmy ją całkowicie niestandardową, ujawniaj tylko operacje, których potrzebujemy, podklasa jest tutaj niewłaściwym narzędziem IMHO.
mhand
3
@mhand Tak, rozumiem to, co mówisz ... Mógłbym zawinąć kolejkę i ujawnić moduł wyliczający kolejki, aby skorzystać z rozszerzeń Linq.
Dave Lawrence
1
zgadzam się z @mhand, nie powinieneś dziedziczyć ConcurrentQueue, ponieważ metoda Enqueue nie jest wirtualna. W razie potrzeby należy proxy kolejki i zaimplementować cały interfejs.
Chris Marisic
29

Dla każdego, kto uzna to za przydatne, oto działający kod oparty na odpowiedzi Richarda Schneidera powyżej:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}
Tod Thomson
źródło
1
Głosowanie w dół z wymienionych powodów (blokowanie podczas korzystania z ConcurrentQueue jest złe), a także nie implementowanie żadnego z wymaganych interfejsów, aby była to prawdziwa kolekcja.
Josh
11

Oto lekki bufor kołowy z niektórymi metodami oznaczonymi jako bezpieczne i niebezpieczne.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

Lubię korzystać z Foo()/SafeFoo()/UnsafeFoo()konwencji:

  • Foometody wywoływane UnsafeFoojako domyślne.
  • UnsafeFoo metody swobodnie modyfikują stan bez blokady, powinny wywoływać tylko inne niebezpieczne metody.
  • SafeFoometody wywołują UnsafeFoometody wewnątrz blokady.

Jest to trochę rozwlekłe, ale powoduje oczywiste błędy, takie jak wywoływanie niebezpiecznych metod poza blokadą w metodzie, która ma być bezpieczna dla wątków, bardziej widoczna.

Julia
źródło
5

Oto moje spojrzenie na kolejkę o stałym rozmiarze

Używa zwykłej kolejki, aby uniknąć obciążenia synchronizacji, gdy Countwłaściwość jest używana ConcurrentQueue. Implementuje również, IReadOnlyCollectiondzięki czemu można używać metod LINQ. Reszta jest bardzo podobna do innych odpowiedzi tutaj.

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
Ali Zahid
źródło
3

Dla zabawy, oto kolejna implementacja, która moim zdaniem rozwiązuje większość obaw komentujących. W szczególności bezpieczeństwo gwintów uzyskuje się bez blokowania, a implementacja jest ukryta przez klasę zawijania.

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}
erdomke
źródło
1
Jest to zepsute, jeśli jest używane jednocześnie - co się stanie, jeśli wątek jest wywłaszczany po wywołaniu, _queue.Enqueue(obj)ale przed Interlocked.Increment(ref _count), a inne wywołania wątku .Count? Wynik byłby błędny. Nie sprawdziłem innych problemów.
KFL,
3

Moja wersja jest po prostu podklasą normalnych Queue... nic specjalnego, ale widząc wszystkich uczestniczących i nadal jest zgodna z tytułem tematu, równie dobrze mógłbym to tutaj umieścić. Na wszelki wypadek zwraca również te usunięte z kolejki.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}
5argon
źródło
2

Dodajmy jeszcze jedną odpowiedź. Dlaczego to ponad innymi?

1) Prostota. Próba zagwarantowania, że ​​rozmiar jest dobry i dobry, ale prowadzi do niepotrzebnej złożoności, która może powodować własne problemy.

2) Implementuje IReadOnlyCollection, co oznacza, że ​​możesz użyć Linq na nim i przekazać go do różnych rzeczy, które oczekują IEnumerable.

3) Bez blokowania. Wiele z powyższych rozwiązań wykorzystuje zamki, co jest nieprawidłowe w przypadku kolekcji bez zamków.

4) Implementuje ten sam zestaw metod, właściwości i interfejsów, co ConcurrentQueue, w tym IProducerConsumerCollection, co jest ważne, jeśli chcesz używać kolekcji z BlockingCollection.

Ta implementacja może potencjalnie zakończyć się większą liczbą wpisów niż oczekiwano, jeśli TryDequeue zawiedzie, ale częstotliwość tego nie wydaje się warta wyspecjalizowanego kodu, który nieuchronnie obniży wydajność i spowoduje własne nieoczekiwane problemy.

Jeśli absolutnie chcesz zagwarantować rozmiar, zaimplementowanie metody Prune () lub podobnej wydaje się najlepszym pomysłem. Możesz użyć blokady odczytu ReaderWriterLockSlim w innych metodach (w tym TryDequeue) i przyjąć blokadę zapisu tylko podczas czyszczenia.

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}
Josh
źródło
2

Tylko dlatego, że nikt jeszcze tego nie powiedział ... możesz użyć LinkedList<T>ai dodać zabezpieczenie wątku:

public class Buffer<T> : LinkedList<T>
{
    private int capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;   
    }

    public void Enqueue(T item)
    {
        // todo: add synchronization mechanism
        if (Count == capacity) RemoveLast();
        AddFirst(item);
    }

    public T Dequeue()
    {
        // todo: add synchronization mechanism
        var last = Last.Value;
        RemoveLast();
        return last;
    }
}

Należy zwrócić uwagę na to, że w tym przykładzie domyślną kolejnością wyliczania będzie LIFO. Ale w razie potrzeby można to zmienić.

Brandon
źródło
1

Dla przyjemności z kodowania przekazuję Ci ' ConcurrentDeck'

public class ConcurrentDeck<T>
{
   private readonly int _size;
   private readonly T[] _buffer;
   private int _position = 0;

   public ConcurrentDeck(int size)
   {
       _size = size;
       _buffer = new T[size];
   }

   public void Push(T item)
   {
       lock (this)
       {
           _buffer[_position] = item;
           _position++;
           if (_position == _size) _position = 0;
       }
   }

   public T[] ReadDeck()
   {
       lock (this)
       {
           return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
       }
   }
}

Przykładowe zastosowanie:

void Main()
{
    var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
    var handle = new ManualResetEventSlim();
    var task1 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task2 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task3 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
    handle.Set();
    var outputtime = DateTime.Now;
    deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
Chris Hayes
źródło
1
Podoba mi się ta implementacja, ale zauważam, że gdy żadna nie została dodana, zwraca wartość domyślną (T)
Daniel Leach,
Jeśli używasz blokady w ten sposób, powinieneś użyć ReaderWriterLockSlim, aby nadać priorytet swoim czytelnikom.
Josh
1

Cóż, zależy to od zastosowania, które zauważyłem, że niektóre z powyższych rozwiązań mogą przekraczać rozmiar, gdy są używane w środowisku wielowątkowym. W każdym razie moim przypadkiem użycia było wyświetlenie ostatnich 5 zdarzeń i istnieje wiele wątków zapisujących zdarzenia w kolejce i jeden inny wątek odczytujący z niej i wyświetlający go w kontrolce Winform. Więc to było moje rozwiązanie.

EDYCJA: Ponieważ używamy już blokowania w naszej implementacji, tak naprawdę nie potrzebujemy ConcurrentQueue, może to poprawić wydajność.

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

EDYCJA: Naprawdę nie potrzebujemy syncObjectw powyższym przykładzie i możemy raczej użyć queueobiektu, ponieważ nie jesteśmy ponownie inicjalizowani queuew żadnej funkcji i jest oznaczony jako readonlymimo wszystko.

Mubaszar
źródło
0

Zaakceptowana odpowiedź będzie miała możliwe do uniknięcia skutki uboczne.

Drobnoziarniste mechanizmy blokujące i wolne od zamków

Poniższe linki to odniesienia, których użyłem, kiedy pisałem mój przykład poniżej.

Chociaż dokumentacja firmy Microsoft jest nieco myląca, ponieważ używają blokady, blokują jednak klasy segregacji. Same klasy segmentów używają Interlocked.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Lib.Core
{
    // Sources: 
    // https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
    // https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=netcore-3.1
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

    /// <summary>
    /// Concurrent safe circular buffer that will used a fixed capacity specified and resuse slots as it goes.
    /// </summary>
    /// <typeparam name="TObject">The object that you want to go into the slots.</typeparam>
    public class ConcurrentCircularBuffer<TObject>
    {
        private readonly ConcurrentQueue<TObject> _queue;

        public int Capacity { get; private set; }

        public ConcurrentCircularBuffer(int capacity)
        {
            if(capacity <= 0)
            {
                throw new ArgumentException($"The capacity specified '{capacity}' is not valid.", nameof(capacity));
            }

            // Setup the queue to the initial capacity using List's underlying implementation.
            _queue = new ConcurrentQueue<TObject>(new List<TObject>(capacity));

            Capacity = capacity;
        }

        public void Enqueue(TObject @object)
        {
            // Enforce the capacity first so the head can be used instead of the entire segment (slow).
            while (_queue.Count + 1 > Capacity)
            {
                if (!_queue.TryDequeue(out _))
                {
                    // Handle error condition however you want to ie throw, return validation object, etc.
                    var ex = new Exception("Concurrent Dequeue operation failed.");
                    ex.Data.Add("EnqueueObject", @object);
                    throw ex;
                }
            }

            // Place the item into the queue
            _queue.Enqueue(@object);
        }

        public TObject Dequeue()
        {
            if(_queue.TryDequeue(out var result))
            {
                return result;
            }

            return default;
        }
    }
}
jjhayter
źródło
0

Oto kolejna implementacja, która wykorzystuje bazową ConcurrentQueue w jak największym stopniu, zapewniając jednocześnie te same interfejsy, które są dostępne za pośrednictwem ConcurrentQueue.

/// <summary>
/// This is a FIFO concurrent queue that will remove the oldest added items when a given limit is reached.
/// </summary>
/// <typeparam name="TValue"></typeparam>
public class FixedSizedConcurrentQueue<TValue> : IProducerConsumerCollection<TValue>, IReadOnlyCollection<TValue>
{
    private readonly ConcurrentQueue<TValue> _queue;

    private readonly object _syncObject = new object();

    public int LimitSize { get; }

    public FixedSizedConcurrentQueue(int limit)
    {
        _queue = new ConcurrentQueue<TValue>();
        LimitSize = limit;
    }

    public FixedSizedConcurrentQueue(int limit, System.Collections.Generic.IEnumerable<TValue> collection)
    {
        _queue = new ConcurrentQueue<TValue>(collection);
        LimitSize = limit;

    }

    public int Count => _queue.Count;

    bool ICollection.IsSynchronized => ((ICollection) _queue).IsSynchronized;

    object ICollection.SyncRoot => ((ICollection)_queue).SyncRoot; 

    public bool IsEmpty => _queue.IsEmpty;

    // Not supported until .NET Standard 2.1
    //public void Clear() => _queue.Clear();

    public void CopyTo(TValue[] array, int index) => _queue.CopyTo(array, index);

    void ICollection.CopyTo(Array array, int index) => ((ICollection)_queue).CopyTo(array, index);

    public void Enqueue(TValue obj)
    {
        _queue.Enqueue(obj);
        lock( _syncObject )
        {
            while( _queue.Count > LimitSize ) {
                _queue.TryDequeue(out _);
            }
        }
    }

    public IEnumerator<TValue> GetEnumerator() => _queue.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<TValue>)this).GetEnumerator();

    public TValue[] ToArray() => _queue.ToArray();

    public bool TryAdd(TValue item)
    {
        Enqueue(item);
        return true;
    }

    bool IProducerConsumerCollection<TValue>.TryTake(out TValue item) => TryDequeue(out item);

    public bool TryDequeue(out TValue result) => _queue.TryDequeue(out result);

    public bool TryPeek(out TValue result) => _queue.TryPeek(out result);

}
Tod Cunningham
źródło
-1

Oto moja wersja kolejki:

public class FixedSizedQueue<T> {
  private object LOCK = new object();
  ConcurrentQueue<T> queue;

  public int MaxSize { get; set; }

  public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
     this.MaxSize = maxSize;
     if (items == null) {
        queue = new ConcurrentQueue<T>();
     }
     else {
        queue = new ConcurrentQueue<T>(items);
        EnsureLimitConstraint();
     }
  }

  public void Enqueue(T obj) {
     queue.Enqueue(obj);
     EnsureLimitConstraint();
  }

  private void EnsureLimitConstraint() {
     if (queue.Count > MaxSize) {
        lock (LOCK) {
           T overflow;
           while (queue.Count > MaxSize) {
              queue.TryDequeue(out overflow);
           }
        }
     }
  }


  /// <summary>
  /// returns the current snapshot of the queue
  /// </summary>
  /// <returns></returns>
  public T[] GetSnapshot() {
     return queue.ToArray();
  }
}

Uważam, że przydatne jest posiadanie konstruktora, który jest zbudowany na IEnumerable i uważam, że przydatne jest posiadanie GetSnapshot, aby mieć wielowątkową bezpieczną listę (w tym przypadku tablicę) elementów w momencie wywołania, która nie rośnie błędy, jeśli zmieni się kolekcja podkładów.

Podwójna kontrola liczenia ma na celu zapobieżenie blokadzie w niektórych okolicznościach.

Nieważne
źródło
1
Głosowanie za zablokowaniem kolejki. Jeśli absolutnie chcesz zablokować, najlepszy byłby ReaderWriterLockSlim (zakładając, że oczekujesz częstszego blokowania odczytu niż blokady zapisu). GetSnapshot również nie jest potrzebny. Jeśli zaimplementujesz IReadOnlyCollection <T> (co powinieneś zrobić dla semantyki IEnumerable), ToList () będzie obsługiwać tę samą funkcję.
Josh
ConcurrentQueue obsługuje blokady w swojej implementacji, zobacz linki w mojej odpowiedzi.
jjhayter