Search
Práce s jedním vláknem bývá pohodlnější, snazší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.
Použití vláken se dá nahrubo rozdělit do 3 kategorií
V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.
Na tomto cvičení začneme s prázdným .cpp souborem.
.cpp
Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.
Proč? Co vypíše na vašem počítači?2)
#include <thread> #include <iostream> int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; }
std::mutex
std::atomic<int>
atomic_counter
int value()
void increment()
Řešení
Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:
#include <thread> #include <iostream> #include <vector> int main() { atomic_counter counter; auto thread_func = [&counter] () { while (counter.value() < 100) { counter.increment(); } }; std::vector<std::thread> threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(thread_func); } for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << '\n'; }
Pravděpodobně vám vypíše 100, tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.
100
bool increment_if_less_than(int)
2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.
Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.
#include <queue> #include <mutex> #include <thread> #include <iostream> #include <chrono> template <typename T> class blocking_queue { public: void add(T const& e) { std::unique_lock<std::mutex> lg(mutex); queue.push(e); } T take() { std::unique_lock<std::mutex> lg(mutex); auto out = std::move(queue.front()); queue.pop(); return out; } private: std::queue<T> queue; std::mutex mutex; }; int main() { using namespace std::chrono_literals; blocking_queue<int> queue; auto producer = [] (blocking_queue<int>& queue) { for (int i = 0; i < 100; ++i) { queue.add(i); std::this_thread::sleep_for(100ms); } }; auto consumer = [] (blocking_queue<int>& queue) { while (true) { auto elem = queue.take(); std::cout << elem << '\n'; } }; std::thread t1(producer, std::ref(queue)); std::thread t2(consumer, std::ref(queue)); t1.join(); t2.join(); }
Opravte frontu tak, aby se vlákno volající take zablokovalo, pokud je fronta prázdná.
take
K čekání se vám bude hodit std::condition_variable.
Teď již nedochází k chybě pokud se volá take() na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože consumer vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty – do zavřené fronty nelze vkládat další prvky a volání take signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná.
take()
consumer
closing
void close()
true
add
producer
close
Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.
blocking_bounded_queue
Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?
Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou synchronized, která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou.
synchronized
synchronized<T>
std::cout
Vzájemnou synchronizaci vláken reprezentuje problém večeřících filosofů.4) „Představme si kulatý stůl, na kterém je po obvodu položených 5 talířů. Mezi každými dvěma talíři je jedna čínská hůlka, celkem je jich tedy také 5. Dále si představme, že za tímto stolem sedí pět filozofů, každý za svým talířem. Filozof představuje nějaký proces a může provádět dvě činnosti – buď obědvat (eat), nebo filozofovat (thing). Aby mohl obědvat, musí si vzít dvě čínské hůlky. Pokud chce filozofovat, nedrží ani jednu hůlku.“
Začneme s třídou reprezentující problém večeřících filozofů.
#include <array> #include <thread> #include <chrono> #include <iostream> #include <string> #include <random> #include <sstream> constexpr int no_of_philosophers = 5; bool locked[no_of_philosophers] = {false, false, false, false, false}; class philosopher { public: philosopher(std::string n, int ll, int rr) : name(std::move(n)), phil(&philosopher::dine, this), lf(ll), rf(rr) { } ~philosopher() { phil.join(); } void print(std::string text, bool is_locked) { std::stringstream ss; ss << name << text << "\t"; if (is_locked) for (int i = 0; i < no_of_philosophers; i++) if (locked[i]) ss << " " << i + 1; ss << std::endl; std::cout << ss.str(); } void dine() { for (int i = 0; i < 10; i++) { think(); eat(); } } void eat() { locked[lf] = true; locked[rf] = true; print(" started eating.", true); static std::uniform_int_distribution<int> dist(1, 6); std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 5)); print(" finished eating.", true); locked[lf] = false; locked[rf] = false; } void think() { static std::uniform_int_distribution<int> dist(1, 6); std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 15)); print(" is thinking.", false); } private: std::string const name; std::thread phil; int lf, rf; std::mt19937 rng{std::random_device{}()}; }; int main() { std::array<philosopher, no_of_philosophers> philosophers {{ { "Aristotle", 0, 1}, { "Platon", 1, 2}, { "Descartes", 2, 3}, { "Kant", 3, 4}, { "Nietzsche", 4, 0}}}; return 0; }
V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.
Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:
#include <thread> #include <mutex> #include <iostream> int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock<std::mutex> l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); }
Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:
#include <string> #include <mutex> #include <thread> #include <chrono> #include <iostream> using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock<std::mutex> l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); }
Podobně jako jsme implementovali synchronized<T>, které zaručovalo synchronizovaný přístup k nějakému T, můžeme naimplementovat concurrent<T>, které zaručí neblokující vykonávání kódu nad nějakým T. Specificky, kód se bude vykonávat na vlastním vláknu.
T
concurrent<T>
counter++; counter–;
std::unique_lock