Search
Dnes si ukážeme, jak (potenciálně) zrychlit naše programy pomocí vláken. Pokud má náš stroj více procesorů, vlákna nám umožní vykonávat více práce najednou. Na dnešním cvičení začneme s prázdným .cpp souborem.
.cpp
“Ahoj, XXXXX! Jak se mas?\n”
“Ahoj, OOOOO! Jak se mas?\n”
std::thread
.join()
Pokud je objekt typu std::thread zničen bez toho, aby se na něm zavolalo buďto .join() (čekej na dokončení) nebo .detach() (nečekej na dokončení), std::thread je krajně nespokojený – shodí nám program. Podobně jako jinde v STL je možné vložit do konstruktoru std::thread cokoliv, co připomíná funkci – např. funkční objekt nebo lambdu.
.detach()
Funkce, která je spuštěna ve vlastním vlákně, může mít parametry. Druhý a další parametr konstruktoru std::thread slouží k tomu, aby tyto parametry předal.
“XXXXX”
“OOOOO”
auto
Řešení
Co se teď stane, když spustíme program? Vidíme, že některé výpisy jsou teď navzájem pomíchané. Důvodem je, že teď každá iterace v každém vlákně použije třikrát výpis na standardní výstup, a tyto tři použití se mohou navzájem libovolně střídat.
std::mutex
std::unique_lock<std::mutex>
std::unique_lock
Nyní se nám výpisy přestanou prolínat, protože druhé vlákno nesmí začít vypisovat, dokud první vlákno nedokončí řádek.
Zkusme si pustit následující program:
#include <thread> #include <mutex> #include <iostream> int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); }
Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?
#include <string> #include <mutex> #include <thread> #include <chrono> #include <iostream> using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); }
#include <thread> #include <iostream> int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; }
counter
std::atomic<int>
atomic_counter
.value()
.increment()
.decrement()
int
Vyzkoušejte třídu atomic_counter na této funkci main:
main
int main() { atomic_counter counter; auto thread_func = [&counter]() { while (counter.value() < 5) { counter.increment(); } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter.value() << '\n'; }
Co je špatně s touto funkcí main? Co s tím?
.increment_if_less_than(int)
Vidíte nějakou vadu v našem řešení problému? Představte si, že chceme, aby vlákna provedla nějakou akci přesně 5-krát. Co s tím?
increment_if_less_than
bool
Asi tušíte, že pořád naše řešení není příliš obecné. Vždy, když budeme chtít, aby počítadlo umělo něco nového, zjistíme, že budeme potřebovat novou metodu. Co když budeme chtít přičítat víc, než 1? Co když budeme chtít odčítat? Co když budeme chtít něco provést, pouze pokud je hodnota počítadla dělitelná třemi? Možnosti jsou různorodé a každé použití by si vyžádalo novou metodu.
Podíváme se na použití vláken pro složitější úlohy. Většina vláknových problémů se dá napasovat na dobře známé a řešené problémy. My se podíváme na implementaci producent-consumer problému, kde alespoň dvě vlákna sdílejí frontu s omezenou kapacitou.
Náš producent bude vytvářet čísla - inty a náš spotřebitel je bude z fronty konzumovat. Pokud chceme, můžeme si představit tato čísla jako indexy do jiné, nesynchronizované kolekce (Alternativně můžeme později nasadit šablony).
Základní schéma je jednoduché
class blocking_bounded_queue { void add(const ham& h); const ham& take(const ham& h); }; blocking_bounded_queue shared_queue; void producer() { while (true) { auto ham = produce_ham(); shared_queue.add(ham); } } void consumer() { while (true) { auto ham = shared_queue.take(); eat_ham(ham); } }
blocking_bounded_queue
add
take
Dále máme i několik méně očividných požadavků, například chceme, aby volání funkce take tzv. blokovalo (nedojde k návratu z funkce) dokud nelze z fronty něco vybrat a volání funkce add blokovalo dokud nelze do fronty prvek přidat. To vyplývá ze struktury producenta a spotřebitele, kteří předpokládají, že tyto funkce prostě proběhnou. To znamená, že funkce take a add musejí mít na čem čekat (například std::condition_variable).
std::condition_variable
Zde je kostra blocking_bounded_queue, splňující první 4 podmínky. Jak doimplementujete funkce add a take, aby splňovala i poslední podmínku?
#include <mutex> #include <queue> class blocking_bounded_queue { public: void add(int i) { std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken if (q.size() < capacity){ q.push(i); } else { // ? } } int take() { std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken if (q.size() > 0){ int temp = q.front(); q.pop(); return temp; } else { // ? } } private: std::queue<int> queue; std::size_t size_limit; std::mutex mutex; };
Trik, který jsme použili abychom umožnili provést bezpečně libovolnou akci nad počítadlem, se dá generalizovat pro arbitrární typ. Takové šabloně se občas říká monitor.
monitor
Jak se taková šablona implementuje?