Práce s jedním vláknem bývá pohodlnější, snažší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.
Použití vláken se dá nahrubo rozdělit do 3 kategorií
V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.
Na tomto cvičení začneme s prázdným .cpp
souborem.
Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.
Proč? Co vypíše na vašem počítači?2)
#include <thread> #include <iostream> int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; }
std::mutex
3)?
std::atomic<int>
?
atomic_counter
?
int value()
a void increment()
.
Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:
#include <thread> #include <iostream> #include <vector> int main() { atomic_counter counter; auto thread_func = [&counter] () { while (counter.value() < 100) { counter.increment(); } }; std::vector<std::thread> threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(thread_func); } for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << '\n'; }
Pravděpodobně vám vypíše 100
, tak jak byste čekali. Toto krásně
ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.
bool increment_if_less_than(int)
, která tento problém opraví.
2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.
Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.
#include <queue> #include <mutex> #include <thread> #include <iostream> #include <chrono> template <typename T> class blocking_queue { public: void add(T const& e) { std::unique_lock<std::mutex> lg(mutex); queue.push(e); } T take() { std::unique_lock<std::mutex> lg(mutex); auto out = std::move(queue.front()); queue.pop(); return out; } private: std::queue<T> queue; std::mutex mutex; }; int main() { using namespace std::chrono_literals; blocking_queue<int> queue; auto producer = [] (blocking_queue<int>& queue) { for (int i = 0; i < 100; ++i) { queue.add(i); std::this_thread::sleep_for(100ms); } }; auto consumer = [] (blocking_queue<int>& queue) { while (true) { auto elem = queue.take(); std::cout << elem << '\n'; } }; std::thread t1(producer, std::ref(queue)); std::thread t2(consumer, std::ref(queue)); t1.join(); t2.join(); }
take
zablokovalo, pokud je fronta prázdná.
K čekání se vám bude hodit std::condition_variable.
Teď již nedochází k chybě pokud se volá take()
na prázdné frontě, ale
dochází k jinému problému: program se nikdy neukončí, protože consumer
vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání
fronty – do zavřené fronty nelze vkládat další prvky a volání take
signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a
prázdná.
closing
a metodu void close()
, která ji nastaví na true
a probudí všechna vlákna
add
tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala
take
producer
volání metody close
consumer
tak aby se čtecí smyčka ukončila když je fronta zavřená
Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.
add
kontrolu velikosti, která případně zablokuje vkládající vlákno
close
probudí i vlákna uspaná uvnitř add
blocking_bounded_queue
Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?
Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter
se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit
třídu pojmenovanou synchronized
, která nám umožní synchronizovat
arbitrární exekuci nad libovolnou třídou.
synchronized<T>
std::cout
V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.
Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:
#include <thread> #include <mutex> #include <iostream> int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); }
Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:
#include <string> #include <mutex> #include <thread> #include <chrono> #include <iostream> using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); }
Podobně jako jsme implementovali synchronized<T>
, které zaručovalo
synchronizovaný přístup k nějakému T
, můžeme naimplementovat
concurrent<T>
, které zaručí neblokující vykonávání kódu nad
nějakým T
. Specificky, kód se bude vykonávat na vlastním vláknu.