Dnes si ukážeme, jak (potenciálně) zrychlit naše programy pomocí vláken. Pokud má náš stroj více procesorů, vlákna nám umožní vykonávat více práce najednou. Na dnešním cvičení začneme s prázdným .cpp
souborem.
“Ahoj, XXXXX! Jak se mas?\n”
na standardní výstup.
“Ahoj, OOOOO! Jak se mas?\n”
na standardní výstup.
std::thread
, který v konstruktoru očekává funkci, kterou má provést.
.join()
.
Pokud je objekt typu std::thread
zničen bez toho, aby se na něm zavolalo buďto .join()
(čekej na dokončení) nebo .detach()
(nečekej na dokončení), std::thread
je krajně nespokojený – shodí nám program. Podobně jako jinde v STL je možné vložit do konstruktoru std::thread
cokoliv, co připomíná funkci – např. funkční objekt nebo lambdu.
Funkce, která je spuštěna ve vlastním vlákně, může mít parametry. Druhý a další parametr konstruktoru std::thread
slouží k tomu, aby tyto parametry předal.
“XXXXX”
a “OOOOO”
.
auto
.
Co se teď stane, když spustíme program? Vidíme, že některé výpisy jsou teď navzájem pomíchané. Důvodem je, že teď každá iterace v každém vlákně použije třikrát výpis na standardní výstup, a tyto tři použití se mohou navzájem libovolně střídat.
std::mutex
, který budou obě vlákna sdílet, ale pouze jedno vlákno může mutex zamknout – druhé vlákno musí počkat.
std::mutex
funkci vlákna. Do capture listu lambda funkce přidejte referenci na mutex, nebo pošlete referenci na mutex dalším parametrem.
std::unique_lock<std::mutex>
. V destruktoru std::unique_lock
se mutex sám odemkne.
Nyní se nám výpisy přestanou prolínat, protože druhé vlákno nesmí začít vypisovat, dokud první vlákno nedokončí řádek.
Zkusme si pustit následující program:
#include <thread> #include <mutex> #include <iostream> int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); }
Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?
Zkusme si pustit následující program:
#include <string> #include <mutex> #include <thread> #include <chrono> #include <iostream> using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); }
Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?
Zkusme si pustit následující program:
#include <thread> #include <iostream> int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; }
Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?
std::mutex
.
counter
za std::atomic<int>
.
atomic_counter
, která má metody .value()
, .increment()
a .decrement()
a která obsahuje int
a std::mutex
.
Vyzkoušejte třídu atomic_counter
na této funkci main
:
int main() { atomic_counter counter; auto thread_func = [&counter]() { while (counter.value() < 5) { counter.increment(); } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter.value() << '\n'; }
Co je špatně s touto funkcí main
? Co s tím?
atomic_counter
metodu .increment_if_less_than(int)
.
Vidíte nějakou vadu v našem řešení problému? Představte si, že chceme, aby vlákna provedla nějakou akci přesně 5-krát. Co s tím?
increment_if_less_than
návratovou hodnotu typu bool
, která nám sdělí, zda se inkrementace povedla.
Asi tušíte, že pořád naše řešení není příliš obecné. Vždy, když budeme chtít, aby počítadlo umělo něco nového, zjistíme, že budeme potřebovat novou metodu. Co když budeme chtít přičítat víc, než 1? Co když budeme chtít odčítat? Co když budeme chtít něco provést, pouze pokud je hodnota počítadla dělitelná třemi? Možnosti jsou různorodé a každé použití by si vyžádalo novou metodu.
atomic_counter
libovolnou podmínku a v závislosti na úspěchu provést libovolnou akci.
Podíváme se na použití vláken pro složitější úlohy. Většina vláknových problémů se dá napasovat na dobře známé a řešené problémy. My se podíváme na implementaci producent-consumer problému, kde alespoň dvě vlákna sdílejí frontu s omezenou kapacitou.
Náš producent bude vytvářet čísla - inty a náš spotřebitel je bude z fronty konzumovat. Pokud chceme, můžeme si představit tato čísla jako indexy do jiné, nesynchronizované kolekce (Alternativně můžeme později nasadit šablony).
Základní schéma je jednoduché
class blocking_bounded_queue { void add(const ham& h); const ham& take(const ham& h); }; blocking_bounded_queue shared_queue; void producer() { while (true) { auto ham = produce_ham(); shared_queue.add(ham); } } void consumer() { while (true) { auto ham = shared_queue.take(); eat_ham(ham); } }ale tímto jsme pouze přesunuli složitost dovnitř
blocking_bounded_queue
, do funkcí add
a take
. Takže, jak naimplementovat blocking_bounded_queue
?
Dále máme i několik méně očividných požadavků, například chceme, aby volání funkce take
tzv. blokovalo (nedojde k návratu z funkce) dokud nelze z fronty něco vybrat a volání funkce add
blokovalo dokud nelze do fronty prvek přidat. To vyplývá ze struktury producenta a spotřebitele, kteří předpokládají, že tyto funkce prostě proběhnou. To znamená, že funkce take
a add
musejí mít na čem čekat (například std::condition_variable
).
Zde je kostra blocking_bounded_queue
, splňující první 4 podmínky. Jak doimplementujete funkce add
a take
, aby splňovala i poslední podmínku?
#include <mutex> #include <queue> class blocking_bounded_queue { public: void add(int i) { std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken if (q.size() < capacity){ q.push(i); } else { // ? } } int take() { std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken if (q.size() > 0){ int temp = q.front(); q.pop(); return temp; } else { // ? } } private: std::queue<int> queue; std::size_t size_limit; std::mutex mutex; };
Trik, který jsme použili abychom umožnili provést bezpečně libovolnou akci nad počítadlem, se dá generalizovat pro arbitrární typ. Takové šabloně se občas říká monitor
.
Jak se taková šablona implementuje?