Práce s jedním vláknem bývá pohodlnější, snazší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.
Použití vláken se dá nahrubo rozdělit do 3 kategorií
V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.
Na tomto cvičení začneme s prázdným .cpp
souborem.
Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.
Proč? Co vypíše na vašem počítači?2)
#include <thread> #include <iostream> int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; }
std::mutex
3)?
std::atomic<int>
?
atomic_counter
?
int value()
a void increment()
.
Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:
#include <thread> #include <iostream> #include <vector> int main() { atomic_counter counter; auto thread_func = [&counter] () { while (counter.value() < 100) { counter.increment(); } }; std::vector<std::thread> threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(thread_func); } for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << '\n'; }
Pravděpodobně vám vypíše 100
, tak jak byste čekali. Toto krásně
ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.
bool increment_if_less_than(int)
, která tento problém opraví.
2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.
Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.
#include <queue> #include <mutex> #include <thread> #include <iostream> #include <chrono> template <typename T> class blocking_queue { public: void add(T const& e) { std::unique_lock<std::mutex> lg(mutex); queue.push(e); } T take() { std::unique_lock<std::mutex> lg(mutex); auto out = std::move(queue.front()); queue.pop(); return out; } private: std::queue<T> queue; std::mutex mutex; }; int main() { using namespace std::chrono_literals; blocking_queue<int> queue; auto producer = [] (blocking_queue<int>& queue) { for (int i = 0; i < 100; ++i) { queue.add(i); std::this_thread::sleep_for(100ms); } }; auto consumer = [] (blocking_queue<int>& queue) { while (true) { auto elem = queue.take(); std::cout << elem << '\n'; } }; std::thread t1(producer, std::ref(queue)); std::thread t2(consumer, std::ref(queue)); t1.join(); t2.join(); }
Opravte frontu tak, aby se vlákno volající take
zablokovalo, pokud je fronta prázdná.
K čekání se vám bude hodit std::condition_variable.
Teď již nedochází k chybě pokud se volá take()
na prázdné frontě, ale
dochází k jinému problému: program se nikdy neukončí, protože consumer
vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání
fronty – do zavřené fronty nelze vkládat další prvky a volání take
signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a
prázdná.
closing
a metodu void close()
, která ji nastaví na true
a probudí všechna vlákna
add
tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala
take
producer
volání metody close
consumer
tak aby se čtecí smyčka ukončila když je fronta zavřená
Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.
add
kontrolu velikosti, která případně zablokuje vkládající vlákno
close
probudí i vlákna uspaná uvnitř add
blocking_bounded_queue
Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?
Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter
se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit
třídu pojmenovanou synchronized
, která nám umožní synchronizovat
arbitrární exekuci nad libovolnou třídou.
synchronized<T>
std::cout
Vzájemnou synchronizaci vláken reprezentuje problém večeřících filosofů.4) „Představme si kulatý stůl, na kterém je po obvodu položených 5 talířů. Mezi každými dvěma talíři je jedna čínská hůlka, celkem je jich tedy také 5. Dále si představme, že za tímto stolem sedí pět filozofů, každý za svým talířem. Filozof představuje nějaký proces a může provádět dvě činnosti – buď obědvat (eat), nebo filozofovat (thing). Aby mohl obědvat, musí si vzít dvě čínské hůlky. Pokud chce filozofovat, nedrží ani jednu hůlku.“
Začneme s třídou reprezentující problém večeřících filozofů.
#include <array> #include <thread> #include <chrono> #include <iostream> #include <string> #include <random> #include <sstream> constexpr int no_of_philosophers = 5; bool locked[no_of_philosophers] = {false, false, false, false, false}; class philosopher { public: philosopher(std::string n, int ll, int rr) : name(std::move(n)), phil(&philosopher::dine, this), lf(ll), rf(rr) { } ~philosopher() { phil.join(); } void print(std::string text, bool is_locked) { std::stringstream ss; ss << name << text << "\t"; if (is_locked) for (int i = 0; i < no_of_philosophers; i++) if (locked[i]) ss << " " << i + 1; ss << std::endl; std::cout << ss.str(); } void dine() { for (int i = 0; i < 10; i++) { think(); eat(); } } void eat() { locked[lf] = true; locked[rf] = true; print(" started eating.", true); static std::uniform_int_distribution<int> dist(1, 6); std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 5)); print(" finished eating.", true); locked[lf] = false; locked[rf] = false; } void think() { static std::uniform_int_distribution<int> dist(1, 6); std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 15)); print(" is thinking.", false); } private: std::string const name; std::thread phil; int lf, rf; std::mt19937 rng{std::random_device{}()}; }; int main() { std::array<philosopher, no_of_philosophers> philosophers {{ { "Aristotle", 0, 1}, { "Platon", 1, 2}, { "Descartes", 2, 3}, { "Kant", 3, 4}, { "Nietzsche", 4, 0}}}; return 0; }Z výpisu programu je zřejmé, že hůlky jsou filozofy používány zcela náhodně. Upravte program tak, aby správně simuloval definovaný problém. Budete potřebovat mutexy pro jednotlivé vidličky a ty pak synchronně zamykat podle toho, který filozof právě večeří.
V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.
Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:
#include <thread> #include <mutex> #include <iostream> int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); }
Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:
#include <string> #include <mutex> #include <thread> #include <chrono> #include <iostream> using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); }
Podobně jako jsme implementovali synchronized<T>
, které zaručovalo
synchronizovaný přístup k nějakému T
, můžeme naimplementovat
concurrent<T>
, které zaručí neblokující vykonávání kódu nad
nějakým T
. Specificky, kód se bude vykonávat na vlastním vláknu.