Brak ConcurrentList <T> w .Net 4.0?

198

Byłem zachwycony, widząc nową System.Collections.Concurrentprzestrzeń nazw w .Net 4.0, całkiem fajnie! Widziałem ConcurrentDictionary, ConcurrentQueue, ConcurrentStack, ConcurrentBagi BlockingCollection.

Jedną z rzeczy, której wydaje się być tajemniczo brakuje, jest ConcurrentList<T>. Czy sam muszę to napisać (lub pobrać z Internetu :))?

Czy brakuje mi czegoś oczywistego?

Alan
źródło
7
ConcurrentBag <T> ( msdn.microsoft.com/pt-br/library/… )
Rodrigo Reis
4
@RodrigoReis, ConcurrentBag <T> to nieuporządkowana kolekcja, podczas gdy lista <T> jest uporządkowana.
Adam Calvet Bohl
4
Jak możesz mieć zamówioną kolekcję w środowisku wielowątkowym? Z założenia nigdy nie miałbyś kontroli nad sekwencją elementów.
Jeremy Holovacs,
Zamiast tego użyj zamka
Erik Bergstedt,
w kodzie źródłowym dotnet znajduje się plik o nazwie ThreadSafeList.cs, który wygląda jak trochę kodu poniżej. Używa również ReaderWriterLockSlim i próbował dowiedzieć się, dlaczego używać tego zamiast prostej blokady (obj)?
Colin Lamarre

Odpowiedzi:

166

I spróbowaliśmy jakiś czas temu (też: na GitHub ). Moja implementacja miała pewne problemy, o których nie będę tutaj wchodził. Co ważniejsze, powiem ci, czego się nauczyłem.

Po pierwsze, nie ma możliwości, aby uzyskać pełną implementację IList<T>tego bez blokady i wątków. W szczególności losowe wstawianie i usuwanie nie będzie działać, chyba że zapomnisz także o losowym dostępie O (1) (tj. Chyba że „oszukasz” i po prostu użyjesz jakiejś połączonej listy i pozwolisz indeksacji ssać).

Co ja pomyślałem może być wart był bezpieczny wątku, ograniczony podzbiór IList<T>: w szczególności taki, który pozwala osobie Addi zapewnić losową tylko do odczytu dostępu indeksu (ale nie Insert, RemoveAtitp, a także nie przypadkowy zapisu dostępu).

To był cel mojej ConcurrentList<T>realizacji . Ale kiedy przetestowałem jego wydajność w scenariuszach wielowątkowych, stwierdziłem, że zwykła synchronizacja dodaje do a List<T>była szybsza . Zasadniczo dodawanie do List<T>jest już błyskawiczne; złożoność zaangażowanych kroków obliczeniowych jest niewielka (zwiększ indeks i przypisz elementowi tablicy; to naprawdę tyle ). Byś potrzebował mnóstwo jednoczesnych zapisów zobaczyć jakiejkolwiek blokady niezgody na ten temat; i nawet wtedy średnia wydajność każdego zapisu nadal przewyższałaby droższą, choć bez blokady, implementację ConcurrentList<T>.

W stosunkowo rzadkim przypadku, gdy wewnętrzna tablica listy musi się zmienić, płacisz niewielki koszt. W końcu doszedłem do wniosku, że był to jedyny niszowy scenariusz, w którym ConcurrentList<T>typ kolekcji tylko do dodania miałby sens: gdy chcesz zagwarantować niski koszt dodawania elementu przy każdym pojedynczym wywołaniu (w przeciwieństwie do zamortyzowanego celu wydajności).

To po prostu nie jest tak przydatna klasa, jak mogłoby się wydawać.

Dan Tao
źródło
52
A jeśli potrzebujesz czegoś podobnego do List<T>tego, co używa synchronizacji opartej na starym skoolu, monitor jest SynchronizedCollection<T>ukryty w BCL: msdn.microsoft.com/en-us/library/ms668265.aspx
LukeH
8
Jeden mały dodatek: użyj parametru Konstruktora wydajności, aby uniknąć (w jak największym stopniu) scenariusza zmiany rozmiaru.
Henk Holterman,
2
Największy scenariusz, w którym ConcurrentListwygrana byłaby wygrana, miałaby miejsce, gdy lista nie byłaby zbyt aktywna, ale jest wielu współbieżnych czytelników. Można zmniejszyć obciążenie czytelników do pojedynczej bariery pamięci (i nawet to wyeliminować, jeśli czytelnicy nie przejmują się nieco przestarzałymi danymi).
supercat
2
@Kevin: Konstruowanie ConcurrentList<T>takiego w taki sposób, że czytelnicy mają zagwarantowany spójny stan bez potrzeby blokowania, jest stosunkowo proste. Gdy lista rozszerzy się z np. Rozmiaru 32 na 64, zachowaj tablicę rozmiaru 32 i utwórz nową tablicę rozmiaru 64. Podczas dodawania każdego z następnych 32 elementów włóż go do gniazda 32-63 nowej tablicy i skopiuj stary element z tablicy o rozmiarze 32 do nowej. Dopóki nie zostanie dodany 64. element, czytelnicy będą szukać w tablicy rozmiar 32 dla pozycji 0-31 oraz w tablicy rozmiar 64 dla pozycji 32-63.
supercat
2
Po dodaniu 64. elementu tablica rozmiaru 32 będzie nadal działać przy pobieraniu elementów 0–31, ale czytelnicy nie będą już musieli go używać. Mogą używać tablicy rozmiar-64 dla wszystkich pozycji 0-63 oraz tablicy rozmiar-128 dla pozycji 64-127. Narzut związany z wyborem jednej z dwóch tablic, a także bariera pamięci w razie potrzeby, byłaby mniejsza niż narzut nawet najbardziej wydajnej blokady czytnik-zapis, jaką można sobie wyobrazić. Zapisy powinny prawdopodobnie używać blokad (bez blokady byłoby to możliwe, zwłaszcza gdyby nie przeszkadzało tworzenie nowej instancji obiektu przy każdym wstawianiu, ale blokada powinna być tania.
supercat
38

Do czego użyłbyś ConcurrentList?

Koncepcja kontenera o dostępie swobodnym w świecie wątków nie jest tak przydatna, jak może się wydawać. Twierdzenie

  if (i < MyConcurrentList.Count)  
      x = MyConcurrentList[i]; 

jako całość nadal nie byłaby bezpieczna dla wątków.

Zamiast tworzyć ConcurrentList, spróbuj budować rozwiązania z tym, co tam jest. Najpopularniejsze klasy to ConcurrentBag, a zwłaszcza BlockingCollection.

Henk Holterman
źródło
Słuszna uwaga. Jednak to, co robię, jest trochę bardziej przyziemne. Właśnie próbuję przypisać ConcurrentBag <T> do IList <T>. Mógłbym zmienić swoją właściwość na IEnumerable <T>, ale wtedy nie mogę .Dodaj do niej rzeczy.
Alan,
1
@Alan: Nie można tego zaimplementować bez zablokowania listy. Ponieważ i tak możesz już tego użyć Monitor, nie ma powodu, aby istniała lista współbieżna.
Billy ONeal,
6
@dcp - tak, to z natury nie bezpieczne dla wątków. ConcurrentDictionary ma specjalne metody, które robią to w jednej operacji atomowej, takie jak AddOrUpdate, GetOrAdd, TryUpdate itp. Nadal mają ContainsKey, ponieważ czasami po prostu chcesz wiedzieć, czy klucz jest dostępny bez modyfikowania słownika (pomyśl HashSet)
Zarat
3
@dcp - ContainsKey sam w sobie jest wątkowo bezpieczny, twój przykład (nie ContainsKey!) ma po prostu warunek wyścigu, ponieważ wykonujesz drugie połączenie w zależności od pierwszej decyzji, która w tym momencie może być już nieaktualna.
Zarat
2
Henk, nie zgadzam się. Myślę, że istnieje prosty scenariusz, w którym może być bardzo przydatny. Zapis wątku roboczego w nim spowoduje, że interfejs użytkownika odczyta i zaktualizuje odpowiednio interfejs. Jeśli chcesz dodać element w uporządkowany sposób, będzie on wymagał zapisu w przypadkowym dostępie. Możesz także użyć stosu i widoku danych, ale będziesz musiał utrzymywać 2 kolekcje :-(.
Eric Ouellet
19

Z całym szacunkiem dla już udzielonych wspaniałych odpowiedzi, czasami po prostu chcę bezpiecznej dla wątków IList. Nic zaawansowanego ani wymyślnego. Wydajność jest ważna w wielu przypadkach, ale czasami nie stanowi to problemu. Tak, zawsze będą wyzwania bez metod takich jak „TryGetValue” itp., Ale w większości przypadków chcę tylko czegoś, co mogę wymienić bez konieczności martwienia się o blokowanie wszystkiego. I tak, ktoś prawdopodobnie może znaleźć „błąd” w mojej implementacji, który może doprowadzić do impasu lub czegoś (przypuszczam), ale bądźmy szczerzy: jeśli chodzi o wielowątkowość, jeśli nie napiszesz poprawnie kodu, to i tak jest w impasie. Mając to na uwadze, postanowiłem wykonać prostą implementację ConcurrentList, która zaspokoi te podstawowe potrzeby.

I za to, co warto: wykonałem podstawowy test dodania 10 000 000 pozycji do zwykłej listy i listy współbieżnych, a wyniki były następujące:

Lista zakończona za: 7793 milisekundy. Współbieżne ukończone za: 8064 milisekund.

public class ConcurrentList<T> : IList<T>, IDisposable
{
    #region Fields
    private readonly List<T> _list;
    private readonly ReaderWriterLockSlim _lock;
    #endregion

    #region Constructors
    public ConcurrentList()
    {
        this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
        this._list = new List<T>();
    }

    public ConcurrentList(int capacity)
    {
        this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
        this._list = new List<T>(capacity);
    }

    public ConcurrentList(IEnumerable<T> items)
    {
        this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
        this._list = new List<T>(items);
    }
    #endregion

    #region Methods
    public void Add(T item)
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.Add(item);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public void Insert(int index, T item)
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.Insert(index, item);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public bool Remove(T item)
    {
        try
        {
            this._lock.EnterWriteLock();
            return this._list.Remove(item);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public void RemoveAt(int index)
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.RemoveAt(index);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public int IndexOf(T item)
    {
        try
        {
            this._lock.EnterReadLock();
            return this._list.IndexOf(item);
        }
        finally
        {
            this._lock.ExitReadLock();
        }
    }

    public void Clear()
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.Clear();
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public bool Contains(T item)
    {
        try
        {
            this._lock.EnterReadLock();
            return this._list.Contains(item);
        }
        finally
        {
            this._lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        try
        {
            this._lock.EnterReadLock();
            this._list.CopyTo(array, arrayIndex);
        }
        finally
        {
            this._lock.ExitReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return new ConcurrentEnumerator<T>(this._list, this._lock);
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return new ConcurrentEnumerator<T>(this._list, this._lock);
    }

    ~ConcurrentList()
    {
        this.Dispose(false);
    }

    public void Dispose()
    {
        this.Dispose(true);
    }

    private void Dispose(bool disposing)
    {
        if (disposing)
            GC.SuppressFinalize(this);

        this._lock.Dispose();
    }
    #endregion

    #region Properties
    public T this[int index]
    {
        get
        {
            try
            {
                this._lock.EnterReadLock();
                return this._list[index];
            }
            finally
            {
                this._lock.ExitReadLock();
            }
        }
        set
        {
            try
            {
                this._lock.EnterWriteLock();
                this._list[index] = value;
            }
            finally
            {
                this._lock.ExitWriteLock();
            }
        }
    }

    public int Count
    {
        get
        {
            try
            {
                this._lock.EnterReadLock();
                return this._list.Count;
            }
            finally
            {
                this._lock.ExitReadLock();
            }
        }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
    #endregion
}

    public class ConcurrentEnumerator<T> : IEnumerator<T>
{
    #region Fields
    private readonly IEnumerator<T> _inner;
    private readonly ReaderWriterLockSlim _lock;
    #endregion

    #region Constructor
    public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
    {
        this._lock = @lock;
        this._lock.EnterReadLock();
        this._inner = inner.GetEnumerator();
    }
    #endregion

    #region Methods
    public bool MoveNext()
    {
        return _inner.MoveNext();
    }

    public void Reset()
    {
        _inner.Reset();
    }

    public void Dispose()
    {
        this._lock.ExitReadLock();
    }
    #endregion

    #region Properties
    public T Current
    {
        get { return _inner.Current; }
    }

    object IEnumerator.Current
    {
        get { return _inner.Current; }
    }
    #endregion
}
Brian Booth
źródło
5
OK, stara odpowiedź, ale nadal: RemoveAt(int index)nigdy nie Insert(int index, T item)jest bezpieczna dla wątków, jest bezpieczna tylko dla indeksu == 0, zwrot IndexOf()jest natychmiast przestarzały itp. Nawet nie zaczynaj o this[int].
Henk Holterman
2
I nie potrzebujesz i nie chcesz ~ Finalizatora ().
Henk Holterman
2
Mówisz, że dałeś się na zapobieganiu możliwość zakleszczenia - i jeden ReaderWriterLockSlimmoże być wykonane do impasu łatwo stosując EnterUpgradeableReadLock()jednocześnie. Nie używasz go jednak, nie udostępniasz zamka na zewnątrz i nie wywołujesz np. Metody, która wchodzi w blokadę zapisu, przytrzymując blokadę odczytu, więc korzystanie z klasy nie powoduje już zakleszczenia prawdopodobne.
Eugene Beresovsky
1
Interfejs niejednorodny nie jest odpowiedni dla równoczesnego dostępu. Np. Poniższe nie są atomowe var l = new ConcurrentList<string>(); /* ... */ l[0] += "asdf";. Ogólnie rzecz biorąc, każda kombinacja odczytu i zapisu może prowadzić do poważnych kłopotów, gdy jest wykonywana jednocześnie. Dlatego współbieżnych struktur danych na ogół zapewniają metody, takie jak te ConcurrentDictionary„s AddOrGetitp NB stałej (i zbędne, ponieważ członkowie są już oznaczone jako takie przez podkreślenie) powtarzanie this.zaśmiecanie.
Eugene Beresovsky
1
Dzięki, Eugene. Jestem intensywnym użytkownikiem .NET Reflector, który umieszcza „to”. na wszystkich polach niestatycznych. Jako taki, wolałem to samo. W związku z tym, że ten niebieżny interfejs nie jest odpowiedni: Masz absolutną rację, że próba wykonania wielu działań przeciwko mojej implementacji może stać się zawodna. Ale wymaganie tutaj polega po prostu na tym, że pojedyncze działania (dodawanie, usuwanie, usuwanie lub wyliczanie) można wykonać bez uszkadzania kolekcji. Zasadniczo eliminuje potrzebę umieszczania instrukcji blokady wokół wszystkiego.
Brian Booth
11

ConcurrentList(jako tablica o zmiennym rozmiarze, a nie połączona lista) nie jest łatwo pisać przy użyciu operacji nieblokujących. Jego interfejs API nie przekłada się dobrze na wersję „współbieżną”.

Stephen Cleary
źródło
12
Nie tylko trudno jest pisać, ale nawet trudno jest znaleźć przydatny interfejs.
CodesInChaos
11

Powodem, dla którego nie ma ConcurrentList, jest to, że zasadniczo nie można go zapisać. Powodem jest to, że kilka ważnych operacji w IList opiera się na indeksach i po prostu nie działa. Na przykład:

int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");

Efektem, do którego dąży autor, jest wstawienie „pies” przed „kot”, ale w środowisku wielowątkowym wszystko może się zdarzyć na liście między tymi dwoma wierszami kodu. Na przykład mógłby zrobić inny wątek list.RemoveAt(0), przesuwając całą listę w lewo, ale co najważniejsze, catIndex się nie zmieni. Wpływ na to jest taki, że Insertoperacja spowoduje umieszczenie „psa” za kotem, a nie przed nim.

Kilka implementacji, które widzisz jako „odpowiedzi” na to pytanie, ma dobre intencje, ale jak pokazuje powyższy, nie oferują wiarygodnych wyników. Jeśli naprawdę chcesz semantyki podobnej do listy w środowisku wielowątkowym, nie możesz się tam dostać, umieszczając blokady w metodach implementacji listy. Musisz upewnić się, że każdy indeks, którego używasz, działa całkowicie w kontekście blokady. Rezultatem jest to, że możesz używać Listy w środowisku wielowątkowym z właściwym blokowaniem, ale sama lista nie może istnieć w tym świecie.

Jeśli uważasz, że potrzebujesz współbieżnej listy, istnieją naprawdę tylko dwie możliwości:

  1. Tak naprawdę potrzebujesz ConcurrentBag
  2. Musisz stworzyć własną kolekcję, być może zaimplementowaną z Listą i własną kontrolą współbieżności.

Jeśli masz ConcurrentBag i jesteś w pozycji, w której musisz przekazać go jako IList, masz problem, ponieważ wywoływana metoda określa, że ​​mogą spróbować zrobić coś takiego jak ja powyżej z cat & pies. W większości światów oznacza to, że metoda, którą wywołujesz, po prostu nie jest zbudowana do pracy w środowisku wielowątkowym. Oznacza to, że albo przebudujesz go tak, aby był, albo, jeśli nie możesz, będziesz musiał obchodzić się z nim bardzo ostrożnie. Prawie na pewno będziesz musiał stworzyć własną kolekcję z własnymi zamkami i wywołać metodę obrażającą w obrębie zamka.

Steve Benz
źródło
5

W przypadkach, gdy odczyty znacznie przewyższają liczbę zapisów lub (jakkolwiek częste) zapisy nie są współbieżne , odpowiednie może być podejście kopiowania przy zapisie .

Implementacja pokazana poniżej to

  • bez zamka
  • błyskawicznie szybki dla równoczesnych odczytów , nawet podczas jednoczesnych modyfikacji - bez względu na to, jak długo to zajmie
  • ponieważ „migawki” są niezmienne, atomowość bez blokady jest możliwa, tzn. var snap = _list; snap[snap.Count - 1];nigdy (cóż, z wyjątkiem pustej listy oczywiście) nie rzuca, a także dostajesz bezpieczne wątkowo wyliczanie z semantyką migawek za darmo .. jak KOCHAM niezmienność!
  • zaimplementowane ogólnie , mające zastosowanie do dowolnej struktury danych i każdego rodzaju modyfikacji
  • bardzo prosty , tzn. łatwy do przetestowania, debugowania, weryfikacji poprzez odczytanie kodu
  • do użytku w .Net 3.5

Aby kopiowanie przy zapisie działało, musisz utrzymywać swoje struktury danych w sposób niezmienny , tzn. Nikt nie może ich zmieniać po udostępnieniu ich innym wątkom. Kiedy chcesz zmodyfikować, ty

  1. sklonuj strukturę
  2. wprowadź zmiany w klonie
  3. zamień atomowo w odniesieniu do zmodyfikowanego klonu

Kod

static class CopyOnWriteSwapper
{
    public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
        where T : class
    {
        while (true)
        {
            var objBefore = Volatile.Read(ref obj);
            var newObj = cloner(objBefore);
            op(newObj);
            if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
                return;
        }
    }
}

Stosowanie

CopyOnWriteSwapper.Swap(ref _myList,
    orig => new List<string>(orig),
    clone => clone.Add("asdf"));

Jeśli potrzebujesz większej wydajności, pomoże to uogólnić metodę, np. Utwórz jedną metodę dla każdego rodzaju modyfikacji (dodaj, usuń, ...), którą chcesz, i zapisz wskaźniki funkcji cloneri op.

Uwaga nr 1 Twoim obowiązkiem jest upewnić się, że nikt nie modyfikuje (rzekomo) niezmiennej struktury danych. Nic nie możemy zrobić w ogólnej implementacji, aby temu zapobiec, ale specjalizując się w niej List<T>, możesz uchronić się przed modyfikacją za pomocą List.AsReadOnly ()

NB # 2 Uważaj na wartości z listy. Powyższe podejście do kopiowania chroni tylko ich członkostwo na liście, ale jeśli nie wstawisz ciągów, ale niektóre inne zmienne obiekty, musisz zadbać o bezpieczeństwo wątków (np. Blokowanie). Jest to jednak ortogonalne w stosunku do tego rozwiązania i np. Można zablokować zmienne wartości bez problemu. Musisz tylko być tego świadomy.

Uwaga nr 3 Jeśli struktura danych jest ogromna i często ją modyfikujesz, podejście kopiowania wszystkich danych w trakcie zapisu może być wygórowane zarówno pod względem zużycia pamięci, jak i kosztów procesora związanych z kopiowaniem. W takim przypadku możesz zamiast tego skorzystać z niezmiennych kolekcji MS .

Eugene Beresovsky
źródło
3

System.Collections.Generic.List<t>jest już bezpieczny dla wielu czytelników. Próba uczynienia tego wątkiem bezpiecznym dla wielu pisarzy nie miałaby sensu. (Z powodów, o których wspominali już Henk i Stephen)

Billy ONeal
źródło
Nie widzisz scenariusza, w którym mogę dodać 5 wątków do listy? W ten sposób możesz zobaczyć listę gromadzącą rekordy, nawet zanim wszystkie się zakończą.
Alan,
9
@Alan - będzie to ConcurrentQueue, ConcurrentStack lub ConcurrentBag. Aby zrozumieć ConcurrentList, należy podać przypadek użycia, w którym dostępne klasy nie są wystarczające. Nie rozumiem, dlaczego miałbym chcieć dostępu zindeksowanego, gdy elementy w indeksach mogą losowo zmieniać się poprzez jednoczesne usuwanie. A dla „zablokowanego” odczytu możesz już zrobić migawki istniejących współbieżnych klas i umieścić je na liście.
Zarat
Masz rację - nie chcę dostępu indeksowanego. Ogólnie używam IList <T> jako proxy dla IEnumerable, do którego mogę .Add (T) nowe elementy. Tak naprawdę pochodzi to pytanie.
Alan
@Alan: W takim razie chcesz kolejki, a nie listy.
Billy ONeal
3
Myślę że się mylisz. Mówienie: bezpieczne dla wielu czytelników nie oznacza, że ​​nie możesz pisać jednocześnie. Zapis oznaczałby również usunięcie i otrzymasz błąd, jeśli usuniesz go podczas iteracji.
Eric Ouellet,
2

Niektórzy zauważyli niektóre punkty towarów (i niektóre z moich myśli):

  • Może wyglądać na szaleństwo z powodu niemożności losowego dostępu (indeksatora), ale dla mnie wygląda to dobrze. Trzeba tylko pomyśleć, że istnieje wiele metod w kolekcjach wielowątkowych, które mogą zawieść, takie jak Indexer i Delete. Można również zdefiniować działanie powodujące awarię (zapis awaryjny) dla modułu zapisującego, takie jak „błąd” lub po prostu „dodaj na końcu”.
  • Nie dlatego, że jest to kolekcja wielowątkowa, że ​​zawsze będzie używana w kontekście wielowątkowym. Lub może być również używany tylko przez jednego pisarza i jednego czytelnika.
  • Innym sposobem na bezpieczne korzystanie z indeksatora może być zawinięcie akcji w blokadę kolekcji za pomocą jej katalogu głównego (jeśli zostanie podana do wiadomości publicznej).
  • Dla wielu osób widoczność rootLocka brzmi „Dobra praktyka”. Nie jestem w 100% pewien co do tego punktu, ponieważ jeśli jest ukryty, usuwasz dużą elastyczność dla użytkownika. Zawsze musimy pamiętać, że programowanie wielowątkowości nie jest dla nikogo. Nie możemy zapobiec wszelkiego rodzaju niewłaściwemu użyciu.
  • Microsoft będzie musiał popracować i zdefiniować nowy standard, aby wprowadzić prawidłowe użycie kolekcji wielowątkowej. Po pierwsze IEnumerator nie powinien mieć parametru moveNext, ale powinien mieć GetNext, który zwraca true lub false i uzyskuje parametr wyjściowy typu T (w ten sposób iteracja nie będzie już blokować). Ponadto Microsoft już używa „za pomocą” wewnętrznie w foreach, ale czasami używa bezpośrednio IEnumeratora bez owijania go za pomocą „za pomocą” (błąd w widoku kolekcji i prawdopodobnie w większej liczbie miejsc) - Korzystanie z IEnumeratora jest zalecane przez Microsoft. Ten błąd usuwa duży potencjał bezpiecznego iteratora ... Iterator, który blokuje kolekcję w konstruktorze i odblokowuje za pomocą metody Dispose - dla blokującej metody foreach.

To nie jest odpowiedź. To tylko komentarze, które tak naprawdę nie pasują do konkretnego miejsca.

... Mój wniosek, Microsoft musi wprowadzić głębokie zmiany w „foreach”, aby ułatwić korzystanie z kolekcji MultiThreaded. Musi również przestrzegać własnych zasad użytkowania IEnumerator. Do tego czasu możemy łatwo napisać MultiThreadList, który używałby iteratora blokującego, ale który nie byłby zgodny z „IList”. Zamiast tego będziesz musiał zdefiniować własny interfejs „IListPersonnal”, który może zawieść bez „wyjątku” przy „wstawianiu”, „usuwaniu” i przypadkowym akcesoriu (indeksatorze). Ale kto będzie chciał z niego korzystać, jeśli nie jest to standard?

Eric Ouellet
źródło
Można łatwo napisać, ConcurrentOrderedBag<T>która zawierałaby implementację tylko do odczytu IList<T>, ale oferowałaby również int Add(T value)metodę w pełni bezpieczną dla wątków . Nie rozumiem, dlaczego ForEachpotrzebne byłyby jakiekolwiek zmiany. Chociaż Microsoft nie mówi tego wprost, ich praktyka sugeruje, że IEnumerator<T>wyliczenie zawartości kolekcji, która istniała podczas jej tworzenia, jest całkowicie do przyjęcia ; wyjątek zmodyfikowany przez kolekcję jest wymagany tylko wtedy, gdy moduł wyliczający nie byłby w stanie zagwarantować bezproblemowego działania.
supercat
Iterując po kolekcji MT, sposób, w jaki jest zaprojektowany, może doprowadzić, jak powiedziałeś, do wyjątku ... Którego nie znam. Czy złapałbyś wszystkie wyjątki? W mojej własnej książce wyjątek stanowi wyjątek i nie powinien występować przy normalnym wykonywaniu kodu. W przeciwnym razie, aby zapobiec wyjątkowi, musisz albo zablokować kolekcję, albo otrzymać kopię (w bezpieczny sposób - np. Zablokować) lub wdrożyć bardzo złożony mechanizm w kolekcji, aby zapobiec wystąpieniu wyjątku z powodu współbieżności. Myślałem jednak, że byłoby miło dodać IEnumeratorMT, który blokowałby kolekcję, podczas gdy dla każdego wystąpił i dodawał powiązany kod ...
Eric Ouellet
Inną rzeczą, która może się również zdarzyć, jest to, że po otrzymaniu iteratora można zablokować kolekcję, a gdy iterator jest pobrany GC, można odblokować kolekcję. Według Microsfot sprawdzają już, czy IEnumerable jest także IDisposable i jeśli tak, wywołują GC na końcu ForEach. Głównym problemem jest to, że używają IEnumerable w innym miejscu bez wywoływania GC, więc nie można na tym polegać. Posiadanie nowego przejrzystego interfejsu MT dla blokowania włączania IEnumerable rozwiązałoby problem, przynajmniej jego część. (Nie powstrzymałoby to ludzi przed dzwonieniem).
Eric Ouellet,
Jest to bardzo zła forma dla publicznej GetEnumeratormetody pozostawiania kolekcji zablokowanej po jej zwróceniu; takie projekty mogą łatwo doprowadzić do impasu. Nie IEnumerable<T>zawiera wskazania, czy można oczekiwać zakończenia wyliczenia, nawet jeśli kolekcja zostanie zmodyfikowana; najlepsze, co można zrobić, to napisać własne metody, aby to zrobiły, i mieć metody, które akceptują IEnumerable<T>dokumentowanie faktu, że będzie ono bezpieczne tylko pod warunkiem, że IEnumerable<T>obsługuje wyliczenie bezpieczne pod kątem wątków.
supercat
Najbardziej pomocne byłoby IEnumerable<T>włączenie metody „Snapshot” z typem zwracanym IEnumerable<T>. Niezmienne kolekcje mogą się zwrócić; kolekcja ograniczona może, jeśli nic innego nie skopiować się do List<T>lub, T[]i wywołać GetEnumeratorto. Niektóre niepowiązane kolekcje mogłyby zostać zaimplementowane Snapshot, a te, które nie byłyby w stanie zgłosić wyjątku bez próby wypełnienia listy swoją zawartością.
supercat
1

W sekwencyjnie wykonywanym kodzie stosowane struktury danych różnią się od (dobrze napisanego) jednocześnie wykonującego się kodu. Powodem jest to, że kod sekwencyjny implikuje porządek niejawny. Współbieżny kod nie oznacza jednak żadnego zamówienia; jeszcze lepiej, implikuje to brak określonej kolejności!

Z tego powodu struktury danych z domyślną kolejnością (np. Lista) nie są zbyt przydatne do rozwiązywania równoczesnych problemów. Lista implikuje porządek, ale nie określa jasno, czym jest ta kolejność. Z tego powodu kolejność wykonywania kodu manipulującego listą określa (do pewnego stopnia) domyślną kolejność listy, która jest w bezpośrednim konflikcie z wydajnym rozwiązaniem współbieżnym.

Pamiętaj, że współbieżność to problem z danymi, a nie problem z kodem! Nie możesz najpierw zaimplementować kodu (lub przepisać istniejącego kodu sekwencyjnego) i uzyskać dobrze zaprojektowane współbieżne rozwiązanie. Najpierw musisz zaprojektować struktury danych, pamiętając o tym, że niejawne porządkowanie nie istnieje w systemie współbieżnym.

Blueprint41
źródło
1

Bezblokowe kopiowanie i zapisywanie działa świetnie, jeśli nie masz do czynienia z zbyt dużą liczbą elementów. Oto klasa, którą napisałem:

public class CopyAndWriteList<T>
{
    public static List<T> Clear(List<T> list)
    {
        var a = new List<T>(list);
        a.Clear();
        return a;
    }

    public static List<T> Add(List<T> list, T item)
    {
        var a = new List<T>(list);
        a.Add(item);
        return a;
    }

    public static List<T> RemoveAt(List<T> list, int index)
    {
        var a = new List<T>(list);
        a.RemoveAt(index);
        return a;
    }

    public static List<T> Remove(List<T> list, T item)
    {
        var a = new List<T>(list);
        a.Remove(item);
        return a;
    }

}

przykładowe użycie: Orders_BUY = CopyAndWriteList.Clear (Orders_BUY);

Rob The Quant
źródło
zamiast blokować tworzy kopię listy, modyfikuje listę i ustawia odwołanie do nowej listy. Tak więc wszelkie inne wątki, które się powtarzają, nie powodują żadnych problemów.
Rob The Quant
0

Zaimplementowałem jeden podobny do Briana . Mój jest inny:

  • Zarządzam tablicą bezpośrednio.
  • Nie wprowadzam zamków w bloku try.
  • Używam yield returndo tworzenia modułu wyliczającego.
  • Obsługuję rekurencję blokady. Umożliwia to odczytywanie z listy podczas iteracji.
  • Tam, gdzie to możliwe, używam blokad odczytu z możliwością aktualizacji.
  • DoSyncoraz GetSyncmetody pozwalające na sekwencyjne interakcje wymagające wyłącznego dostępu do listy.

Kod :

public class ConcurrentList<T> : IList<T>, IDisposable
{
    private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    private int _count = 0;

    public int Count
    {
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _count;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    public int InternalArrayLength
    { 
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _arr.Length;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    private T[] _arr;

    public ConcurrentList(int initialCapacity)
    {
        _arr = new T[initialCapacity];
    }

    public ConcurrentList():this(4)
    { }

    public ConcurrentList(IEnumerable<T> items)
    {
        _arr = items.ToArray();
        _count = _arr.Length;
    }

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {       
            var newCount = _count + 1;          
            EnsureCapacity(newCount);           
            _arr[_count] = item;
            _count = newCount;                  
        }
        finally
        {
            _lock.ExitWriteLock();
        }       
    }

    public void AddRange(IEnumerable<T> items)
    {
        if (items == null)
            throw new ArgumentNullException("items");

        _lock.EnterWriteLock();

        try
        {           
            var arr = items as T[] ?? items.ToArray();          
            var newCount = _count + arr.Length;
            EnsureCapacity(newCount);           
            Array.Copy(arr, 0, _arr, _count, arr.Length);       
            _count = newCount;
        }
        finally
        {
            _lock.ExitWriteLock();          
        }
    }

    private void EnsureCapacity(int capacity)
    {   
        if (_arr.Length >= capacity)
            return;

        int doubled;
        checked
        {
            try
            {           
                doubled = _arr.Length * 2;
            }
            catch (OverflowException)
            {
                doubled = int.MaxValue;
            }
        }

        var newLength = Math.Max(doubled, capacity);            
        Array.Resize(ref _arr, newLength);
    }

    public bool Remove(T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {           
            var i = IndexOfInternal(item);

            if (i == -1)
                return false;

            _lock.EnterWriteLock();
            try
            {   
                RemoveAtInternal(i);
                return true;
            }
            finally
            {               
                _lock.ExitWriteLock();
            }
        }
        finally
        {           
            _lock.ExitUpgradeableReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        _lock.EnterReadLock();

        try
        {    
            for (int i = 0; i < _count; i++)
                // deadlocking potential mitigated by lock recursion enforcement
                yield return _arr[i]; 
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public int IndexOf(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item);
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    private int IndexOfInternal(T item)
    {
        return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
    }

    public void Insert(int index, T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {                       
            if (index > _count)
                throw new ArgumentOutOfRangeException("index"); 

            _lock.EnterWriteLock();
            try
            {       
                var newCount = _count + 1;
                EnsureCapacity(newCount);

                // shift everything right by one, starting at index
                Array.Copy(_arr, index, _arr, index + 1, _count - index);

                // insert
                _arr[index] = item;     
                _count = newCount;
            }
            finally
            {           
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }


    }

    public void RemoveAt(int index)
    {   
        _lock.EnterUpgradeableReadLock();
        try
        {   
            if (index >= _count)
                throw new ArgumentOutOfRangeException("index");

            _lock.EnterWriteLock();
            try
            {           
                RemoveAtInternal(index);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }
    }

    private void RemoveAtInternal(int index)
    {           
        Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
        _count--;

        // release last element
        Array.Clear(_arr, _count, 1);
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {        
            Array.Clear(_arr, 0, _count);
            _count = 0;
        }
        finally
        {           
            _lock.ExitWriteLock();
        }   
    }

    public bool Contains(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item) != -1;
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {       
        _lock.EnterReadLock();
        try
        {           
            if(_count > array.Length - arrayIndex)
                throw new ArgumentException("Destination array was not long enough.");

            Array.Copy(_arr, 0, array, arrayIndex, _count);
        }
        finally
        {
            _lock.ExitReadLock();           
        }
    }

    public bool IsReadOnly
    {   
        get { return false; }
    }

    public T this[int index]
    {
        get
        {
            _lock.EnterReadLock();
            try
            {           
                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                return _arr[index]; 
            }
            finally
            {
                _lock.ExitReadLock();               
            }           
        }
        set
        {
            _lock.EnterUpgradeableReadLock();
            try
            {

                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                _lock.EnterWriteLock();
                try
                {                       
                    _arr[index] = value;
                }
                finally
                {
                    _lock.ExitWriteLock();              
                }
            }
            finally
            {
                _lock.ExitUpgradeableReadLock();
            }

        }
    }

    public void DoSync(Action<ConcurrentList<T>> action)
    {
        GetSync(l =>
        {
            action(l);
            return 0;
        });
    }

    public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
    {
        _lock.EnterWriteLock();
        try
        {           
            return func(this);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Dispose()
    {   
        _lock.Dispose();
    }
}
Ronnie Overby
źródło
Co się stanie, jeśli dwa wątki dostaną się na początek trybloku Removelub ustawiacza indeksu w tym samym czasie?
James
@James, który nie wydaje się możliwy. Przeczytaj uwagi na msdn.microsoft.com/en-us/library/ ... Po uruchomieniu tego kodu nigdy nie wejdziesz do tej blokady po raz drugi: gist.github.com/ronnieoverby/59b715c3676127a113c3
Ronnie Overby
@Ronny Overby: Ciekawe. Biorąc to pod uwagę, podejrzewam, że działałoby to znacznie lepiej, gdybyś usunął UpgradableReadLock ze wszystkich funkcji, w których jedyna operacja wykonana w czasie między blokadą odczytu do aktualizacji a blokadą zapisu - narzut związany z podjęciem jakiejkolwiek blokady jest o wiele więcej niż sprawdzenie, czy parametr jest poza zakresem, że samo sprawdzenie wewnątrz blokady zapisu prawdopodobnie działałoby lepiej.
James
Ta klasa również nie wydaje się bardzo przydatna, ponieważ funkcje oparte na przesunięciach (większość z nich) naprawdę nigdy nie mogą być bezpiecznie używane, chyba że istnieje jakiś zewnętrzny schemat blokowania, ponieważ kolekcja może się zmieniać pomiędzy, kiedy zdecydujesz, gdzie umieścić lub weź coś od i kiedy go rzeczywiście dostaniesz.
James
1
Chciałem odnotować, że zdaję sobie sprawę, że przydatność IListsemantyki w scenariuszach równoległych jest co najwyżej ograniczona. Napisałem ten kod prawdopodobnie zanim doszedłem do tej realizacji. Moje doświadczenie jest takie samo jak autor akceptowanej odpowiedzi: spróbowałem z tym, co wiedziałem o synchronizacji i IList <T> i nauczyłem się czegoś, robiąc to.
Ronnie Overby