Pule wątków w C ++ 11

139

Odpowiednie pytania :

O C ++ 11:

O Boost:


Jak uzyskać pulę wątków, do których mam wysyłać zadania , bez ciągłego ich tworzenia i usuwania? Oznacza to, że trwałe wątki będą ponownie synchronizowane bez łączenia.


Mam kod, który wygląda następująco:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Zamiast tworzyć i łączyć wątki w każdej iteracji, wolałbym wysyłać zadania do moich wątków roboczych w każdej iteracji i tworzyć je tylko raz.

Yktula
źródło
2
oto pokrewne pytanie i moja odpowiedź.
didierc
1
myślałeś o używaniu tbb (to Intel, ale darmowe i open source, i robi dokładnie to, co chcesz: po prostu przesyłasz (rekurencyjnie podzielne) zadania i nie martwisz się o wątki)?
Walter
2
Ten projekt FOSS jest moją próbą stworzenia biblioteki puli wątków, sprawdź to, jeśli chcesz. -> code.google.com/p/threadpool11
Etherealone
Co jest złego w używaniu TBB?
Walter

Odpowiedzi:

92

To jest skopiowane z mojej odpowiedzi do innego bardzo podobnego posta, mam nadzieję, że może pomóc:

1) Zacznij od maksymalnej liczby wątków obsługiwanych przez system:

int Num_Threads =  thread::hardware_concurrency();

2) Aby zapewnić wydajną implementację puli wątków, po utworzeniu wątków zgodnie z Num_Threads lepiej nie tworzyć nowych lub niszczyć starych (poprzez łączenie). Wystąpi spadek wydajności, może nawet spowodować, że aplikacja będzie działać wolniej niż wersja szeregowa.

Każdy wątek C ++ 11 powinien działać w swojej funkcji z nieskończoną pętlą, ciągle czekając na nowe zadania do pobrania i uruchomienia.

Oto jak dołączyć taką funkcję do puli wątków:

int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{  Pool.push_back(thread(Infinite_loop_function));}

3) Infinite_loop_function

To jest pętla „while (true)” oczekująca na kolejkę zadań

void The_Pool:: Infinite_loop_function()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);

            condition.wait(lock, []{return !Queue.empty() || terminate_pool});
            Job = Queue.front();
            Queue.pop();
        }
        Job(); // function<void()> type
    }
};

4) Utwórz funkcję dodawania zadania do kolejki

void The_Pool:: Add_Job(function<void()> New_Job)
{
    {
        unique_lock<mutex> lock(Queue_Mutex);
        Queue.push(New_Job);
    }
    condition.notify_one();
}

5) Powiąż dowolną funkcję z kolejką

Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Po zintegrowaniu tych składników uzyskasz własną dynamiczną pulę wątków. Te wątki zawsze działają, czekając na wykonanie zadania.

Przepraszam jeśli są jakieś błędy składniowe, wpisałem ten kod i mam złą pamięć. Przepraszamy, że nie mogę podać pełnego kodu puli wątków, co naruszyłoby moją integralność zawodową.

Edycja: aby zamknąć pulę, wywołaj metodę shutdown ():

XXXX::shutdown(){
{
    unique_lock<mutex> lock(threadpool_mutex);
    terminate_pool = true;} // use this flag in condition.wait

    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for(std::thread &every_thread : thread_vector)
    {   every_thread.join();}

    thread_vector.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}
Doktorat AP EcE
źródło
Jak masz wektor <thread>, gdy wątek (const thread &) = delete?
Christopher Pisz
1
@ChristopherPisz std::vectornie wymaga, aby jego elementy były kopiowalne. Można korzystać z typów wektorów Move-only ( unique_ptr, thread, future, itd.).
Daniel Langr,
2
W shutdown () powinno to być thread_vector.clear (); zamiast thread_vector.empty (); Poprawny?
sudheerbb
1
Co się dzieje, gdy kończysz pracę i nie ma już pracy?
user877329
1
„Infinite_loop_function” to zabawna nazwa funkcji, która pobiera zadania z kolejki i je wykonuje.
Solomon Slow
87

Możesz użyć biblioteki puli wątków C ++, https://github.com/vit-vit/ctpl .

Następnie kod, który napisałeś, można zastąpić następującym

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

Otrzymasz żądaną liczbę wątków i nie będziesz ich tworzyć ani usuwać w kółko podczas iteracji.

wit-wit
źródło
11
To powinna być odpowiedź; czytelna, prosta, zwięzła i zgodna ze standardami biblioteka C ++ 11 z pojedynczym nagłówkiem. Świetna robota!
Jonathan H
@ vit-vit czy możesz podać przykład z funkcją? jak wypchnąć funkcję członka klasy oresults[j] = p.push([&arr, j](int){ arr[j] +=2; });
Hani Goc
1
@HaniGoc Po prostu przechwyć instancję przez odniesienie.
Jonathan H
@ vit-vit Wysłano Ci żądanie ściągnięcia w celu ulepszenia wersji STL.
Jonathan H
@ vit-vit: Trudno jest skontaktować się z opiekunem tej biblioteki z pytaniami, podpowiedzią.
einpoklum
66

Pula wątków oznacza, że ​​wszystkie Twoje wątki działają przez cały czas - innymi słowy, funkcja wątku nigdy nie zwraca. Aby nadać wątkom coś sensownego do zrobienia, musisz zaprojektować system komunikacji między wątkami, zarówno w celu poinformowania wątku, że jest coś do zrobienia, jak i do przekazywania rzeczywistych danych roboczych.

Zazwyczaj będzie to wymagało pewnej współbieżnej struktury danych, a każdy wątek przypuszczalnie spałby na jakiejś zmiennej warunkowej, która byłaby powiadamiana, gdy jest do zrobienia. Po otrzymaniu powiadomienia jeden lub kilka wątków budzi się, odzyskuje zadanie ze struktury współbieżnych danych, przetwarza je i zapisuje wynik w analogiczny sposób.

Następnie wątek sprawdzał, czy jest jeszcze więcej pracy, a jeśli nie, wraca do snu.

W rezultacie musisz to wszystko zaprojektować samodzielnie, ponieważ nie ma naturalnego pojęcia „pracy”, które byłoby uniwersalne. To sporo pracy i jest kilka subtelnych problemów, które musisz rozwiązać. (Możesz programować w Go, jeśli podoba Ci się system, który zajmie się zarządzaniem wątkami w tle).

Kerrek SB
źródło
12
„musisz to wszystko zaprojektować samodzielnie” <- tego staram się unikać. Gorutyny wydają się jednak fantastyczne.
Yktula
3
@Yktula: Cóż, to wysoce nietrywialne zadanie. Z Twojego posta nie wynika nawet, jaki rodzaj pracy chcesz wykonać, a to ma fundamentalne znaczenie dla rozwiązania. Możesz zaimplementować Go w C ++, ale będzie to bardzo specyficzna rzecz i połowa ludzi będzie narzekać, że chciałaby czegoś innego.
Kerrek SB
19

Pula wątków to w istocie zbiór wątków, które są powiązane z funkcją działającą jako pętla zdarzeń. Wątki te będą bez końca czekać na wykonanie zadania lub na własne zakończenie.

Zadanie puli wątków ma zapewnić interfejs do przesyłania zadań, definiowania (i być może modyfikowania) zasad uruchamiania tych zadań (reguł planowania, tworzenia instancji wątków, rozmiaru puli) oraz monitorowania stanu wątków i powiązanych zasobów.

Tak więc, aby uzyskać wszechstronną pulę, należy zacząć od określenia, czym jest zadanie, w jaki sposób jest uruchamiane, przerywane, jaki jest wynik (patrz pojęcie obietnicy i przyszłości dla tego pytania), na jakie zdarzenia będą musiały odpowiedzieć wątki do tego, jak sobie z nimi poradzą, w jaki sposób te zdarzenia będą odróżniane od tych, których dotyczą zadania. Jak widać, może to stać się dość skomplikowane i nałożyć ograniczenia na sposób działania wątków, ponieważ rozwiązanie staje się coraz bardziej zaangażowane.

Obecne narzędzia do obsługi zdarzeń są dość proste (*): prymitywy, takie jak muteksy, zmienne warunkowe i kilka dodatkowych abstrakcji (blokady, bariery). Ale w niektórych przypadkach te ograniczenia mogą okazać się nieodpowiednie (zobacz to powiązane pytanie ) i należy powrócić do używania prymitywów.

Należy również rozwiązać inne problemy:

  • sygnał
  • I / O
  • sprzęt (koligacja procesora, konfiguracja heterogeniczna)

Jak by to wyglądało w twoim otoczeniu?

Ta odpowiedź na podobne pytanie wskazuje na istniejącą implementację przeznaczoną do zwiększenia i STL.

Zaproponowałem bardzo prymitywną implementację puli wątków na inne pytanie, które nie rozwiązuje wielu problemów opisanych powyżej. Możesz chcieć na tym zbudować. Możesz również rzucić okiem na istniejące frameworki w innych językach, aby znaleźć inspirację.


(*) Nie uważam tego za problem, wręcz przeciwnie. Myślę, że to duch C ++ odziedziczony po C.

didierc
źródło
7
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
pio
źródło
3
Dzięki! To naprawdę pomogło mi w rozpoczęciu równoległych operacji wątków. Skończyło się na nieco zmodyfikowanej wersji twojej implementacji.
Robbie Capps
1
Nie musisz m_accept_functions być typu atomowego. m_accept_functions chronione przez mutex.
dmikos
4

Możesz użyć thread_pool z biblioteki boost:

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

Możesz także użyć Threadpool ze społeczności open source:

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}
Amir Fo
źródło
3

Coś takiego może pomóc (pobrane z działającej aplikacji).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

Możesz go używać w ten sposób:

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Należy pamiętać, że ponowne wynalezienie wydajnego mechanizmu kolejkowania asynchronicznego nie jest trywialne.

Boost :: asio :: io_service jest bardzo wydajną implementacją, a właściwie jest zbiorem opakowań specyficznych dla platformy (np. Opakowuje porty zakończenia operacji we / wy w systemie Windows).

rustyx
źródło
2
Czy w C ++ 11 jest to konieczne? Powiedzmy, czy nie std::threadwystarczy?
einpoklum
Nie ma odpowiednika w stdfor boost::thread_group. boost::thread_groupto zbiór boost::threadinstancji. Ale oczywiście bardzo łatwo jest zastąpić boost::thread_groupa vectorz std::threads.
rustyx
3

Edycja: teraz wymaga C ++ 17 i koncepcji. (Od 9/12/16 wystarczy tylko g ++ 6.0+.)

Odliczenie szablonu jest jednak o wiele dokładniejsze z tego powodu, więc warto zdobyć nowszy kompilator. Nie znalazłem jeszcze funkcji wymagającej jawnych argumentów szablonu.

Zajmuje teraz również dowolny odpowiedni wywoływalny obiekt ( i nadal jest statycznie bezpieczny !!! ).

Zawiera teraz również opcjonalną pulę wątków priorytetowych zielonych wątków korzystającą z tego samego interfejsu API. Ta klasa jest jednak tylko POSIX. Używa ucontext_tAPI do przełączania zadań w przestrzeni użytkownika.


Stworzyłem do tego prostą bibliotekę. Przykład użycia podano poniżej. (Odpowiadam na to, ponieważ była to jedna z rzeczy, które znalazłem, zanim zdecydowałem, że trzeba to napisać samodzielnie.)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads

  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

Możesz przekazać asyncdowolną funkcję z dowolną (lub void) wartością zwracaną i dowolnymi (lub żadnymi) argumentami, a zwróci ona odpowiednią std::future. Aby uzyskać wynik (lub po prostu poczekać, aż zadanie się zakończy), dzwonisz get()do przyszłości.

Oto github: https://github.com/Tyler-Hardin/thread_pool .

Tyler
źródło
1
Wygląda niesamowicie, ale byłoby wspaniale mieć porównanie do nagłówka vit-vit!
Jonathan H
1
@ Sh3ljohn, patrząc na to, wygląda na to, że są w zasadzie takie same w API. vit-vit używa kolejki bez blokady, która jest lepsza niż moja. (Ale moim celem było zrobienie tego z tylko std :: *. Przypuszczam, że mógłbym sam zaimplementować kolejkę bez blokady, ale brzmi to ciężko i podatnie na błędy). Ponadto, vit-vit nie ma powiązanego pliku .cpp, który jest prostszy w użyciu dla osób, które nie wiedzą, co robią. (Np. Github.com/Tyler-Hardin/thread_pool/issues/1 )
Tyler
Ma również rozwiązanie tylko dla STL, które rozwidlałem przez ostatnie kilka godzin, na początku wyglądało to na bardziej skomplikowane niż twoje ze wspólnymi wskaźnikami w całym miejscu, ale jest to faktycznie potrzebne do poprawnej obsługi zmiany rozmiaru na gorąco.
Jonathan H
@ Sh3ljohn, ah, nie zauważyłem gorącej zmiany rozmiaru. To miłe. Zdecydowałem się nie martwić, ponieważ tak naprawdę nie jest to zgodne z zamierzonym zastosowaniem. (Nie przychodzi mi do głowy przypadek, w którym chciałbym zmienić rozmiar osobiście, ale może to wynikać z braku wyobraźni.)
Tyler
1
Przykładowy przypadek użycia: korzystasz z RESTful API na serwerze i musisz tymczasowo zmniejszyć alokację zasobów do celów konserwacyjnych, bez konieczności całkowitego zamykania usługi.
Jonathan H
3

Jest to kolejna implementacja puli wątków, która jest bardzo prosta, łatwa do zrozumienia i użycia, używa tylko standardowej biblioteki C ++ 11 i może być przeglądana lub modyfikowana do twoich zastosowań, powinna być dobrym początkiem, jeśli chcesz zacząć używać wątków totalizator piłkarski:

https://github.com/progschj/ThreadPool

Zimorodek
źródło
1

Pula wątków bez zależności poza STL jest całkowicie możliwa. Niedawno napisałem małą bibliotekę wątków z samym nagłówkiem, aby rozwiązać ten sam problem. Obsługuje dynamiczną zmianę rozmiaru puli (zmianę liczby pracowników w czasie wykonywania), oczekiwanie, zatrzymywanie, wstrzymywanie, wznawianie i tak dalej. Mam nadzieję, że uznasz to za przydatne.

cantordust
źródło
wygląda na to, że usunąłeś swoje konto github (lub masz zły link). Czy przeniosłeś ten kod gdzie indziej?
rtpax
1
@rtpax Przeniosłem repozytorium - zaktualizowałem odpowiedź, aby to odzwierciedlić.
cantordust