Odpowiednie pytania :
O C ++ 11:
- C ++ 11: std :: thread pooled?
- Czy async (launch :: async) w C ++ 11 sprawi, że pule wątków będą przestarzałe, aby uniknąć kosztownego tworzenia wątków?
O Boost:
Jak uzyskać pulę wątków, do których mam wysyłać zadania , bez ciągłego ich tworzenia i usuwania? Oznacza to, że trwałe wątki będą ponownie synchronizowane bez łączenia.
Mam kod, który wygląda następująco:
namespace {
std::vector<std::thread> workers;
int total = 4;
int arr[4] = {0};
void each_thread_does(int i) {
arr[i] += 2;
}
}
int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}
Zamiast tworzyć i łączyć wątki w każdej iteracji, wolałbym wysyłać zadania do moich wątków roboczych w każdej iteracji i tworzyć je tylko raz.
c++
multithreading
c++11
threadpool
stdthread
Yktula
źródło
źródło
Odpowiedzi:
To jest skopiowane z mojej odpowiedzi do innego bardzo podobnego posta, mam nadzieję, że może pomóc:
1) Zacznij od maksymalnej liczby wątków obsługiwanych przez system:
int Num_Threads = thread::hardware_concurrency();
2) Aby zapewnić wydajną implementację puli wątków, po utworzeniu wątków zgodnie z Num_Threads lepiej nie tworzyć nowych lub niszczyć starych (poprzez łączenie). Wystąpi spadek wydajności, może nawet spowodować, że aplikacja będzie działać wolniej niż wersja szeregowa.
Każdy wątek C ++ 11 powinien działać w swojej funkcji z nieskończoną pętlą, ciągle czekając na nowe zadania do pobrania i uruchomienia.
Oto jak dołączyć taką funkcję do puli wątków:
int Num_Threads = thread::hardware_concurrency(); vector<thread> Pool; for(int ii = 0; ii < Num_Threads; ii++) { Pool.push_back(thread(Infinite_loop_function));}
3) Infinite_loop_function
To jest pętla „while (true)” oczekująca na kolejkę zadań
void The_Pool:: Infinite_loop_function() { while(true) { { unique_lock<mutex> lock(Queue_Mutex); condition.wait(lock, []{return !Queue.empty() || terminate_pool}); Job = Queue.front(); Queue.pop(); } Job(); // function<void()> type } };
4) Utwórz funkcję dodawania zadania do kolejki
void The_Pool:: Add_Job(function<void()> New_Job) { { unique_lock<mutex> lock(Queue_Mutex); Queue.push(New_Job); } condition.notify_one(); }
5) Powiąż dowolną funkcję z kolejką
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
Po zintegrowaniu tych składników uzyskasz własną dynamiczną pulę wątków. Te wątki zawsze działają, czekając na wykonanie zadania.
Przepraszam jeśli są jakieś błędy składniowe, wpisałem ten kod i mam złą pamięć. Przepraszamy, że nie mogę podać pełnego kodu puli wątków, co naruszyłoby moją integralność zawodową.
Edycja: aby zamknąć pulę, wywołaj metodę shutdown ():
XXXX::shutdown(){ { unique_lock<mutex> lock(threadpool_mutex); terminate_pool = true;} // use this flag in condition.wait condition.notify_all(); // wake up all threads. // Join all threads. for(std::thread &every_thread : thread_vector) { every_thread.join();} thread_vector.clear(); stopped = true; // use this flag in destructor, if not set, call shutdown() }
źródło
std::vector
nie wymaga, aby jego elementy były kopiowalne. Można korzystać z typów wektorów Move-only (unique_ptr
,thread
,future
, itd.).Możesz użyć biblioteki puli wątków C ++, https://github.com/vit-vit/ctpl .
Następnie kod, który napisałeś, można zastąpić następującym
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library int main (int argc, char *argv[]) { ctpl::thread_pool p(2 /* two threads in the pool */); int arr[4] = {0}; std::vector<std::future<void>> results(4); for (int i = 0; i < 8; ++i) { // for 8 iterations, for (int j = 0; j < 4; ++j) { results[j] = p.push([&arr, j](int){ arr[j] +=2; }); } for (int j = 0; j < 4; ++j) { results[j].get(); } arr[4] = std::min_element(arr, arr + 4); } }
Otrzymasz żądaną liczbę wątków i nie będziesz ich tworzyć ani usuwać w kółko podczas iteracji.
źródło
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
Pula wątków oznacza, że wszystkie Twoje wątki działają przez cały czas - innymi słowy, funkcja wątku nigdy nie zwraca. Aby nadać wątkom coś sensownego do zrobienia, musisz zaprojektować system komunikacji między wątkami, zarówno w celu poinformowania wątku, że jest coś do zrobienia, jak i do przekazywania rzeczywistych danych roboczych.
Zazwyczaj będzie to wymagało pewnej współbieżnej struktury danych, a każdy wątek przypuszczalnie spałby na jakiejś zmiennej warunkowej, która byłaby powiadamiana, gdy jest do zrobienia. Po otrzymaniu powiadomienia jeden lub kilka wątków budzi się, odzyskuje zadanie ze struktury współbieżnych danych, przetwarza je i zapisuje wynik w analogiczny sposób.
Następnie wątek sprawdzał, czy jest jeszcze więcej pracy, a jeśli nie, wraca do snu.
W rezultacie musisz to wszystko zaprojektować samodzielnie, ponieważ nie ma naturalnego pojęcia „pracy”, które byłoby uniwersalne. To sporo pracy i jest kilka subtelnych problemów, które musisz rozwiązać. (Możesz programować w Go, jeśli podoba Ci się system, który zajmie się zarządzaniem wątkami w tle).
źródło
Pula wątków to w istocie zbiór wątków, które są powiązane z funkcją działającą jako pętla zdarzeń. Wątki te będą bez końca czekać na wykonanie zadania lub na własne zakończenie.
Zadanie puli wątków ma zapewnić interfejs do przesyłania zadań, definiowania (i być może modyfikowania) zasad uruchamiania tych zadań (reguł planowania, tworzenia instancji wątków, rozmiaru puli) oraz monitorowania stanu wątków i powiązanych zasobów.
Tak więc, aby uzyskać wszechstronną pulę, należy zacząć od określenia, czym jest zadanie, w jaki sposób jest uruchamiane, przerywane, jaki jest wynik (patrz pojęcie obietnicy i przyszłości dla tego pytania), na jakie zdarzenia będą musiały odpowiedzieć wątki do tego, jak sobie z nimi poradzą, w jaki sposób te zdarzenia będą odróżniane od tych, których dotyczą zadania. Jak widać, może to stać się dość skomplikowane i nałożyć ograniczenia na sposób działania wątków, ponieważ rozwiązanie staje się coraz bardziej zaangażowane.
Obecne narzędzia do obsługi zdarzeń są dość proste (*): prymitywy, takie jak muteksy, zmienne warunkowe i kilka dodatkowych abstrakcji (blokady, bariery). Ale w niektórych przypadkach te ograniczenia mogą okazać się nieodpowiednie (zobacz to powiązane pytanie ) i należy powrócić do używania prymitywów.
Należy również rozwiązać inne problemy:
Jak by to wyglądało w twoim otoczeniu?
Ta odpowiedź na podobne pytanie wskazuje na istniejącą implementację przeznaczoną do zwiększenia i STL.
Zaproponowałem bardzo prymitywną implementację puli wątków na inne pytanie, które nie rozwiązuje wielu problemów opisanych powyżej. Możesz chcieć na tym zbudować. Możesz również rzucić okiem na istniejące frameworki w innych językach, aby znaleźć inspirację.
(*) Nie uważam tego za problem, wręcz przeciwnie. Myślę, że to duch C ++ odziedziczony po C.
źródło
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
#pragma once #include <queue> #include <functional> #include <mutex> #include <condition_variable> #include <atomic> #include <cassert> class Function_pool { private: std::queue<std::function<void()>> m_function_queue; std::mutex m_lock; std::condition_variable m_data_condition; std::atomic<bool> m_accept_functions; public: Function_pool(); ~Function_pool(); void push(std::function<void()> func); void done(); void infinite_loop_func(); };
#include "function_pool.h" Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true) { } Function_pool::~Function_pool() { } void Function_pool::push(std::function<void()> func) { std::unique_lock<std::mutex> lock(m_lock); m_function_queue.push(func); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap lock.unlock(); m_data_condition.notify_one(); } void Function_pool::done() { std::unique_lock<std::mutex> lock(m_lock); m_accept_functions = false; lock.unlock(); // when we send the notification immediately, the consumer will try to get the lock , so unlock asap m_data_condition.notify_all(); //notify all waiting threads. } void Function_pool::infinite_loop_func() { std::function<void()> func; while (true) { { std::unique_lock<std::mutex> lock(m_lock); m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; }); if (!m_accept_functions && m_function_queue.empty()) { //lock will be release automatically. //finish the thread loop and let it join in the main thread. return; } func = m_function_queue.front(); m_function_queue.pop(); //release the lock } func(); } }
#include "function_pool.h" #include <string> #include <iostream> #include <mutex> #include <functional> #include <thread> #include <vector> Function_pool func_pool; class quit_worker_exception : public std::exception {}; void example_function() { std::cout << "bla" << std::endl; } int main() { std::cout << "stating operation" << std::endl; int num_threads = std::thread::hardware_concurrency(); std::cout << "number of threads = " << num_threads << std::endl; std::vector<std::thread> thread_pool; for (int i = 0; i < num_threads; i++) { thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool)); } //here we should send our functions for (int i = 0; i < 50; i++) { func_pool.push(example_function); } func_pool.done(); for (unsigned int i = 0; i < thread_pool.size(); i++) { thread_pool.at(i).join(); } }
źródło
Możesz użyć thread_pool z biblioteki boost:
void my_task(){...} int main(){ int threadNumbers = thread::hardware_concurrency(); boost::asio::thread_pool pool(threadNumbers); // Submit a function to the pool. boost::asio::post(pool, my_task); // Submit a lambda object to the pool. boost::asio::post(pool, []() { ... }); }
Możesz także użyć Threadpool ze społeczności open source:
void first_task() {...} void second_task() {...} int main(){ int threadNumbers = thread::hardware_concurrency(); pool tp(threadNumbers); // Add some tasks to the pool. tp.schedule(&first_task); tp.schedule(&second_task); }
źródło
Coś takiego może pomóc (pobrane z działającej aplikacji).
#include <memory> #include <boost/asio.hpp> #include <boost/thread.hpp> struct thread_pool { typedef std::unique_ptr<boost::asio::io_service::work> asio_worker; thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) { for (int i = 0; i < threads; ++i) { auto worker = [this] { return service.run(); }; grp.add_thread(new boost::thread(worker)); } } template<class F> void enqueue(F f) { service.post(f); } ~thread_pool() { service_worker.reset(); grp.join_all(); service.stop(); } private: boost::asio::io_service service; asio_worker service_worker; boost::thread_group grp; };
Możesz go używać w ten sposób:
thread_pool pool(2); pool.enqueue([] { std::cout << "Hello from Task 1\n"; }); pool.enqueue([] { std::cout << "Hello from Task 2\n"; });
Należy pamiętać, że ponowne wynalezienie wydajnego mechanizmu kolejkowania asynchronicznego nie jest trywialne.
Boost :: asio :: io_service jest bardzo wydajną implementacją, a właściwie jest zbiorem opakowań specyficznych dla platformy (np. Opakowuje porty zakończenia operacji we / wy w systemie Windows).
źródło
std::thread
wystarczy?std
forboost::thread_group
.boost::thread_group
to zbiórboost::thread
instancji. Ale oczywiście bardzo łatwo jest zastąpićboost::thread_group
avector
zstd::thread
s.Edycja: teraz wymaga C ++ 17 i koncepcji. (Od 9/12/16 wystarczy tylko g ++ 6.0+.)
Odliczenie szablonu jest jednak o wiele dokładniejsze z tego powodu, więc warto zdobyć nowszy kompilator. Nie znalazłem jeszcze funkcji wymagającej jawnych argumentów szablonu.
Zajmuje teraz również dowolny odpowiedni wywoływalny obiekt ( i nadal jest statycznie bezpieczny !!! ).
Zawiera teraz również opcjonalną pulę wątków priorytetowych zielonych wątków korzystającą z tego samego interfejsu API. Ta klasa jest jednak tylko POSIX. Używa
ucontext_t
API do przełączania zadań w przestrzeni użytkownika.Stworzyłem do tego prostą bibliotekę. Przykład użycia podano poniżej. (Odpowiadam na to, ponieważ była to jedna z rzeczy, które znalazłem, zanim zdecydowałem, że trzeba to napisać samodzielnie.)
bool is_prime(int n){ // Determine if n is prime. } int main(){ thread_pool pool(8); // 8 threads list<future<bool>> results; for(int n = 2;n < 10000;n++){ // Submit a job to the pool. results.emplace_back(pool.async(is_prime, n)); } int n = 2; for(auto i = results.begin();i != results.end();i++, n++){ // i is an iterator pointing to a future representing the result of is_prime(n) cout << n << " "; bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result. if(prime) cout << "is prime"; else cout << "is not prime"; cout << endl; } }
Możesz przekazać
async
dowolną funkcję z dowolną (lub void) wartością zwracaną i dowolnymi (lub żadnymi) argumentami, a zwróci ona odpowiedniąstd::future
. Aby uzyskać wynik (lub po prostu poczekać, aż zadanie się zakończy), dzwoniszget()
do przyszłości.Oto github: https://github.com/Tyler-Hardin/thread_pool .
źródło
Jest to kolejna implementacja puli wątków, która jest bardzo prosta, łatwa do zrozumienia i użycia, używa tylko standardowej biblioteki C ++ 11 i może być przeglądana lub modyfikowana do twoich zastosowań, powinna być dobrym początkiem, jeśli chcesz zacząć używać wątków totalizator piłkarski:
https://github.com/progschj/ThreadPool
źródło
Pula wątków bez zależności poza STL jest całkowicie możliwa. Niedawno napisałem małą bibliotekę wątków z samym nagłówkiem, aby rozwiązać ten sam problem. Obsługuje dynamiczną zmianę rozmiaru puli (zmianę liczby pracowników w czasie wykonywania), oczekiwanie, zatrzymywanie, wstrzymywanie, wznawianie i tak dalej. Mam nadzieję, że uznasz to za przydatne.
źródło