Search
Práce s jedním vláknem bývá pohodlnější, snažší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.
Použití vláken se dá nahrubo rozdělit do 3 kategorií
V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.
Na tomto cvičení začneme s prázdným .cpp souborem.
.cpp
Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.
Proč? Co vypíše na vašem počítači?2)
#include <thread> #include <iostream> int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; }
std::mutex
std::atomic<int>
atomic_counter
int value()
void increment()
Řešení
Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:
#include <thread> #include <iostream> #include <vector> int main() { atomic_counter counter; auto thread_func = [&counter] () { while (counter.value() < 100) { counter.increment(); } }; std::vector<std::thread> threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(thread_func); } for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << '\n'; }
Pravděpodobně vám vypíše 100, tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.
100
bool increment_if_less_than(int)
2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.
Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.
#include <queue> #include <mutex> #include <thread> #include <iostream> #include <chrono> template <typename T> class blocking_queue { public: void add(T const& e) { std::unique_lock<std::mutex> lg(mutex); queue.push(e); } T take() { std::unique_lock<std::mutex> lg(mutex); auto out = std::move(queue.front()); queue.pop(); return out; } private: std::queue<T> queue; std::mutex mutex; }; int main() { using namespace std::chrono_literals; blocking_queue<int> queue; auto producer = [] (blocking_queue<int>& queue) { for (int i = 0; i < 100; ++i) { queue.add(i); std::this_thread::sleep_for(100ms); } }; auto consumer = [] (blocking_queue<int>& queue) { while (true) { auto elem = queue.take(); std::cout << elem << '\n'; } }; std::thread t1(producer, std::ref(queue)); std::thread t2(consumer, std::ref(queue)); t1.join(); t2.join(); }
take
K čekání se vám bude hodit std::condition_variable.
Teď již nedochází k chybě pokud se volá take() na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože consumer vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty – do zavřené fronty nelze vkládat další prvky a volání take signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná.
take()
consumer
closing
void close()
true
add
producer
close
Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.
blocking_bounded_queue
Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?
Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou synchronized, která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou.
synchronized
synchronized<T>
std::cout
V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.
Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:
#include <thread> #include <mutex> #include <iostream> int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); }
Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:
#include <string> #include <mutex> #include <thread> #include <chrono> #include <iostream> using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); }
Podobně jako jsme implementovali synchronized<T>, které zaručovalo synchronizovaný přístup k nějakému T, můžeme naimplementovat concurrent<T>, které zaručí neblokující vykonávání kódu nad nějakým T. Specificky, kód se bude vykonávat na vlastním vláknu.
T
concurrent<T>
counter++; counter–;
std::unique_lock