C ++ 0x nie ma semaforów? Jak synchronizować wątki?

135

Czy to prawda, że ​​C ++ 0x przyjdzie bez semaforów? Istnieje już kilka pytań dotyczących przepełnienia stosu dotyczących używania semaforów. Używam ich (semaforów posix) cały czas, aby pozwolić wątkowi czekać na jakieś zdarzenie w innym wątku:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

Gdybym zrobił to z muteksem:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problem: Jest brzydki i nie ma gwarancji, że Thread1 najpierw zablokuje muteks (biorąc pod uwagę, że ten sam wątek powinien blokować i odblokowywać muteks, nie możesz również zablokować zdarzenia1 przed uruchomieniem wątku0 i wątku1).

Skoro więc doładowanie również nie ma semaforów, jaki jest najprostszy sposób osiągnięcia powyższego?

tauran
źródło
Może użyć warunku mutex i std :: promise i std :: future?
Yves,

Odpowiedzi:

180

Możesz łatwo zbudować go z muteksu i zmiennej warunku:

#include <mutex>
#include <condition_variable>

class semaphore
{
private:
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void notify() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_wait() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
Maxim Egorushkin
źródło
96
ktoś powinien złożyć propozycję do
7
komentarz tutaj, który początkowo mnie zdziwił, to blokada w oczekiwaniu, można zapytać, w jaki sposób wątek może przejść przez powiadomienie, jeśli blokada jest wstrzymana przez czekanie? nieco słabo udokumentowana odpowiedź jest taka, że ​​zmienna condition_variable.wait pulsuje blokadą, umożliwiając innemu
31
Celowo został wykluczony z Boost na tej podstawie, że semafor to zbyt dużo liny, by programiści mogli się na nim powiesić. Zmienne warunkowe podobno są łatwiejsze do zarządzania. Widzę ich punkt widzenia, ale czuję się trochę protekcjonalny. Zakładam, że ta sama logika odnosi się do C ++ 11 - od programistów oczekuje się pisania programów w sposób, który „naturalnie” używa condvarów lub innych zatwierdzonych technik synchronizacji. Podanie semafora działałoby przeciwko temu, niezależnie od tego, czy jest zaimplementowane na wierzchu condvara, czy natywnie.
Steve Jessop
5
Uwaga - uzasadnienie pętli można znaleźć na stronie en.wikipedia.org/wiki/Spurious_wakeupwhile(!count_) .
Dan Nissenbaum
3
@Maxim Przepraszam, nie sądzę, że masz rację. sem_wait i sem_post tylko syscall również w przypadku rywalizacji (sprawdź sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c ), więc kod tutaj kończy się powielaniem implementacji libc, z potencjalnymi błędami. Jeśli zamierzasz przenosić w dowolnym systemie, może to być rozwiązanie, ale jeśli potrzebujesz tylko kompatybilności z Posix, użyj semafora Posix.
xryl669,
107

Bazując na odpowiedzi Maxima Yegorushkina , spróbowałem zrobić przykład w stylu C ++ 11.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
Tsuneo Yoshioka
źródło
34
Możesz zrobić czekać () również trzy-liniową:cv.wait(lck, [this]() { return count > 0; });
Domi
2
Pomocne jest również dodanie kolejnej klasy w duchu lock_guard. W sposób RAII konstruktor, który przyjmuje semafor jako odniesienie, wywołuje wywołanie wait () semafora, a destruktor wywołuje jego wywołanie notify (). Zapobiega to niepowodzeniu zwolnienia semafora przez wyjątki.
Jim Hunziker
czy nie ma martwej blokady, jeśli powiedzmy, że N wątków nazywa się wait () i count == 0, to cv.notify_one (); nigdy nie jest wywoływany, ponieważ mtx nie został wydany?
Marcello
1
@Marcello Oczekujące wątki nie posiadają blokady. Celem zmiennych warunków jest zapewnienie atomowej operacji „odblokuj i poczekaj”.
David Schwartz
3
Powinieneś zwolnić blokadę przed wywołaniem notify_one (), aby uniknąć natychmiastowego blokowania wybudzania ... patrz tutaj: en.cppreference.com/w/cpp/thread/condition_variable/notify_all
kylefinn
38

Zdecydowałem się napisać najsolidniejszy / generyczny semafor C ++ 11, jaki mogłem, w stylu standardowym, jak tylko potrafię (uwaga using semaphore = ..., normalnie używałbyś nazwy semaphorepodobnej do normalnie używanej stringnie basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
David
źródło
To działa, z niewielką zmianą. Te wait_fori wait_untilwywołania metody z orzecznika zwrócić wartość logiczną (a nie `std :: cv_status).
jdknight
przepraszam, że tak późno w grze wybrałem nitkę std::size_tjest bez znaku, więc zmniejszenie go poniżej zera to UB i zawsze będzie >= 0. IMHO countpowinno być int.
Richard Hodges
3
@RichardHodges nie ma możliwości zmniejszenia poniżej zera, więc nie ma problemu, a co oznaczałoby ujemne obliczenie semafora? To nawet nie ma sensu IMO.
David
1
@David A co by było, gdyby wątek musiał czekać, aż inni zainicjują coś? na przykład 1 wątek czytnika czeka na 4 wątki, wywołałbym konstruktor semafora z -3, aby wątek czytnika czekał, aż wszystkie inne wątki utworzą post. Sądzę, że są inne sposoby, aby to zrobić, ale czy nie jest to rozsądne? Myślę, że w rzeczywistości jest to pytanie, które zadaje OP, ale z większą liczbą „wątków1”.
jmmut
2
@RichardHodges jest bardzo pedantyczny, zmniejszanie liczby całkowitej bez znaku poniżej 0 nie jest UB.
jcai
15

zgodnie z semaforami posix, dodałbym

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

I zdecydowanie wolę używać mechanizmu synchronizacji na dogodnym poziomie abstrakcji, zamiast zawsze kopiować wklejanie zszytej wersji przy użyciu bardziej podstawowych operatorów.

Michael Zillich
źródło
9

Możesz również sprawdzić cpp11-on-multicore - ma przenośną i optymalną implementację semafora.

Repozytorium zawiera również inne dodatki do wątków, które uzupełniają wątki w języku c ++ 11.

onqtam
źródło
8

Możesz pracować ze zmiennymi mutex i warunkowymi. Uzyskujesz wyłączny dostęp z muteksem, sprawdź, czy chcesz kontynuować, czy musisz czekać na drugi koniec. Jeśli musisz czekać, czekasz w stanie. Gdy inny wątek stwierdzi, że możesz kontynuować, sygnalizuje stan.

W bibliotece boost :: thread jest krótki przykład , który najprawdopodobniej możesz po prostu skopiować (biblioteki wątków C ++ 0x i boost są bardzo podobne).

David Rodríguez - dribeas
źródło
Stan sygnalizuje tylko oczekujące wątki, czy nie? Więc jeśli wątek0 nie czeka na sygnał wątku1, zostanie później zablokowany? Plus: nie potrzebuję dodatkowego zamka, który jest dostarczany z warunkiem - to narzut.
tauran
Tak, warunek sygnalizuje tylko oczekujące wątki. Typowym wzorcem jest posiadanie zmiennej ze stanem i warunkiem na wypadek, gdybyś musiał czekać. Pomyśl o producencie / odbiorcy, będzie zliczanie pozycji w buforze, producent blokuje, dodaje element, zwiększa liczbę i sygnalizuje. Konsument blokuje, sprawdza licznik i jeśli wartość niezerowa zużywa, a jeśli zero czeka w warunku.
David Rodríguez - dribeas
2
Możesz zasymulować semafor w ten sposób: Zainicjuj zmienną wartością, którą nadasz semaforowi, a następnie wait()przetłumacz ją na "blokuj, sprawdź licznik, jeśli nie zerową dekrementację i kontynuuj; jeśli zero czeka na warunek" while postbędzie "blokada, licznik przyrostów, sygnał, jeśli było 0 "
David Rodríguez - dribeas
Tak, brzmi dobrze. Zastanawiam się, czy semafory Posix są zaimplementowane w ten sam sposób.
tauran
@tauran: Nie wiem na pewno (i może to zależeć od tego, który Posix OS), ale myślę, że mało prawdopodobne. Semafory są tradycyjnie prymitywem synchronizacji „niższego poziomu” niż muteksy i zmienne warunkowe i w zasadzie mogą być bardziej wydajne niż byłyby zaimplementowane na warstwie condvar. Dlatego bardziej prawdopodobne w danym systemie operacyjnym jest to, że wszystkie prymitywy synchronizacji na poziomie użytkownika są zbudowane na podstawie niektórych typowych narzędzi, które współpracują z harmonogramem.
Steve Jessop
3

Może być również przydatna otoka semafora RAII w wątkach:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Przykład użycia w aplikacji wielowątkowej:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
slasla
źródło
3

C ++ 20 wreszcie będzie miał semafory - std::counting_semaphore<max_count> .

Będą one miały (przynajmniej) następujące metody:

  • acquire() (bloking)
  • try_acquire() (bez blokowania, zwraca natychmiast)
  • try_acquire_for() (bez blokowania, trwa pewien czas)
  • try_acquire_until() (bez blokowania, zajmuje trochę czasu, aby przestać próbować)
  • release()

Tego jeszcze nie ma na liście cppreference, ale możesz przeczytać te slajdy prezentacji CppCon 2019 lub obejrzeć wideo . Jest też oficjalna propozycja P0514R4 , ale nie jestem pewien, czy jest to najbardziej aktualna wersja.

einpoklum
źródło
2

Okazało się, że shared_ptr i słabe_ptr, długie z listą, wykonały pracę, której potrzebowałem. Mój problem polegał na tym, że kilku klientów chciało wchodzić w interakcje z wewnętrznymi danymi hosta. Zazwyczaj host aktualizuje dane samodzielnie, jednak jeśli klient tego zażąda, musi przerwać aktualizację, dopóki żaden klient nie uzyska dostępu do danych hosta. W tym samym czasie klient może poprosić o wyłączny dostęp, aby żaden inny klient ani host nie mógł modyfikować danych hosta.

Jak to zrobiłem, stworzyłem strukturę:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Każdy klient miałby członka takiego:

UpdateLock::ptr m_myLock;

Wtedy host miałby element slaby_ptr na wyłączność i listę slabych_ptrs dla blokad niewyłącznych:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

Dostępna jest funkcja umożliwiająca blokowanie i inna funkcja sprawdzająca, czy host jest zablokowany:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

Testuję blokady w LockUpdate, IsUpdateLocked i okresowo w ramach procedury aktualizacji hosta. Testowanie blokady jest tak proste, jak sprawdzenie, czy słaby_ptr wygasł i usunięcie dowolnego wygasłego z listy m_locks (robię to tylko podczas aktualizacji hosta). Mogę sprawdzić, czy lista jest pusta; w tym samym czasie otrzymuję automatyczne odblokowanie, gdy klient resetuje shared_ptr, na którym się trzyma, co również ma miejsce, gdy klient zostaje automatycznie zniszczony.

Ogólny efekt jest taki, że ponieważ klienci rzadko potrzebują wyłączności (zwykle zarezerwowanej tylko dla dodatków i usunięć), w większości przypadków żądanie LockUpdate (false), to znaczy niewyłączne, kończy się sukcesem, o ile (! M_exclusiveLock). LockUpdate (true), żądanie wyłączności, kończy się sukcesem tylko wtedy, gdy zarówno (! M_exclusiveLock), jak i (m_locks.empty ()).

Można dodać kolejkę, aby złagodzić blokady między wyłącznymi i niewyłącznymi blokadami, jednak do tej pory nie miałem żadnych kolizji, więc zamierzam poczekać, aż to się stanie, aby dodać rozwiązanie (głównie dlatego, że mam rzeczywisty warunek testu).

Jak dotąd działa to dobrze na moje potrzeby; Mogę sobie wyobrazić potrzebę rozszerzenia tego i pewne problemy, które mogą pojawić się podczas rozszerzonego użytkowania, jednak było to szybkie do wdrożenia i wymagało bardzo mało niestandardowego kodu.

Zestaw 10
źródło
-4

Jeśli ktoś jest zainteresowany wersją atomową, oto implementacja. Oczekuje się, że wydajność będzie lepsza niż wersja ze zmiennymi muteksami i warunkami.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
Jeffery
źródło
4
Spodziewałbym się, że wydajność będzie znacznie gorsza. Ten kod popełnia prawie każdy możliwy błąd. Jako najbardziej oczywisty przykład załóżmy, że waitkod musi się zapętlić kilka razy. Kiedy w końcu się odblokuje, zajmie matkę wszystkich błędnie przewidzianych gałęzi, ponieważ przewidywanie pętli procesora z pewnością przewiduje, że zapętli się ponownie. Mógłbym wymienić wiele innych problemów z tym kodem.
David Schwartz,
1
Oto kolejny oczywisty zabójca wydajności: waitpętla zużywa zasoby mikrowykonań procesora podczas obracania się. Załóżmy, że znajduje się w tym samym fizycznym rdzeniu, co wątek, który do notifyniego należy - to strasznie spowolni ten wątek.
David Schwartz
1
I jeszcze jeden: na procesorach x86 (najpopularniejszych obecnie procesorach) operacja compare_exchange_weak jest zawsze operacją zapisu, nawet jeśli się nie powiedzie (zapisuje tę samą wartość, którą odczytał, jeśli porównanie się nie powiedzie). Załóżmy więc, że oba rdzenie są w waitpętli dla tego samego semafora. Obaj piszą z pełną prędkością do tej samej linii pamięci podręcznej, co może spowolnić inne rdzenie do indeksowania poprzez nasycenie magistrali między rdzeniami.
David Schwartz,
@DavidSchwartz Cieszę się, że widzę Twoje komentarze. Nie jestem pewien, czy rozumiesz część „... przewidywanie pętli procesora ...”. Uzgodniony drugi. Najwyraźniej twój trzeci przypadek może się zdarzyć, ale w porównaniu z muteksem, który powoduje przełączanie trybu użytkownika na tryb jądra i wywołanie systemowe, synchronizacja między rdzeniami nie jest gorsza.
Jeffery,
1
Nie ma czegoś takiego jak semafor bez blokady. Cała idea bycia wolnym od blokad nie polega na pisaniu kodu bez użycia muteksów, ale na pisaniu kodu, w którym wątek nigdy się nie blokuje. W tym przypadku istotą semafora jest blokowanie wątków, które wywołują funkcję wait ()!
Carlo Wood,