{{page>courses:b6b36pjc:styles#common&noheader&nofooter}} {{page>courses:b6b36pjc:styles#cviceni&noheader&nofooter}} ======= Cvičení 11: Vlákna a paralelismus ======= {{ :courses:b6b36pcc:cviceni:pruvodce.png?90|Průvodce studiem}} Práce s jedním vláknem bývá pohodlnější, snazší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc. Použití vláken se dá nahrubo rozdělit do 3 kategorií * Rozdělení stejné práce mezi N vláken * Rozdělení zpracování dat do N kroků (Jedno vlákno načítá data, další je zpracovává) * Zmenšení latence zpracování vstupů (GUI má obvykle separátní vlákno) V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií ((3. kategorie je v zásadě speciální varianta 2. kategorie)) za pomoci nástrojů z C%%++%% standardní knihovny. Na tomto cvičení začneme s prázdným ''.cpp'' souborem. ===== Počítadlo ===== Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není. **Proč? Co vypíše na vašem počítači?**((Pokud kód zkompilujete v release módu, tj. s optimalizacemi, může se stát, že překladač usoudí, že ''counter%%++%%; counter--;'' nemá vlastně žádný význam a tento kód vyhodí.)) #include #include int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; } * Jak toto opravit pomocí ''std::mutex''((A ''std::unique_lock''))? * Jak toto opravit pomocí ''std::atomic''? * Jak toto opravit vlastní třídou ''atomic_counter''? * Datovými položkami by měl být vlastní counter a mutex. * Implementujte metody ''int value()'' a ''void increment()''. {{:courses:b6b36pcc:cviceni:cviceni_11_counter.zip|Řešení}} Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek: #include #include #include int main() { atomic_counter counter; auto thread_func = [&counter] () { while (counter.value() < 100) { counter.increment(); } }; std::vector threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(thread_func); } for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << '\n'; } Pravděpodobně vám vypíše ''100'', tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy. * Přidejte vašemu počítadlu metodu ''bool increment_if_less_than(int)'', která tento problém opraví. {{:courses:b6b36pcc:cviceni:cviceni_11_counter2.zip|Řešení}} * Můžeme nějak vyhodnotit libovolnou podmínku a provést libovolnou akci, aniž bychom pokaždé přidávali novou metodu? {{:courses:b6b36pcc:cviceni:cviceni_11_counter3.zip|Řešení}} ===== Fronta ===== 2. kategorie vícevláknových kódů je postavena na frontách. Protože C%%++%% standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen. Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě. #include #include #include #include #include template class blocking_queue { public: void add(T const& e) { std::unique_lock lg(mutex); queue.push(e); } T take() { std::unique_lock lg(mutex); auto out = std::move(queue.front()); queue.pop(); return out; } private: std::queue queue; std::mutex mutex; }; int main() { using namespace std::chrono_literals; blocking_queue queue; auto producer = [] (blocking_queue& queue) { for (int i = 0; i < 100; ++i) { queue.add(i); std::this_thread::sleep_for(100ms); } }; auto consumer = [] (blocking_queue& queue) { while (true) { auto elem = queue.take(); std::cout << elem << '\n'; } }; std::thread t1(producer, std::ref(queue)); std::thread t2(consumer, std::ref(queue)); t1.join(); t2.join(); } {{ :courses:b6b36pcc:cviceni:ukoly.png?90|Úkoly k procvičení}} Opravte frontu tak, aby se vlákno volající ''take'' zablokovalo, pokud je fronta prázdná. K čekání se vám bude hodit [[https://en.cppreference.com/w/cpp/thread/condition_variable|std::condition_variable]]. {{:courses:b6b36pcc:cviceni:cviceni_11_queue-1.zip|Řešení}} Teď již nedochází k chybě pokud se volá ''take()'' na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože ''consumer'' vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty -- do zavřené fronty nelze vkládat další prvky a volání ''take'' signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná. * Přidejte frontě proměnnou ''closing'' a metodu ''void close()'', která ji nastaví na ''true'' a probudí všechna vlákna * Zmeňte metodu ''add'' tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala * Podobným způsobem změňte ''take'' * Přidejte do funkce ''producer'' volání metody ''close'' * Upravte funkci ''consumer'' tak aby se čtecí smyčka ukončila když je fronta zavřená {{:courses:b6b36pcc:cviceni:cviceni_11_queue-2.zip|Řešení}} Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká -- pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna. * Přidejte frontě konstruktor, který bere maximální velikost * Přidejte do ''add'' kontrolu velikosti, která případně zablokuje vkládající vlákno * Zkontrolujte zda ''close'' probudí i vlákna uspaná uvnitř ''add'' * Přejmenujte frontu na ''blocking_bounded_queue'' {{:courses:b6b36pcc:cviceni:cviceni_11_queue-3.zip|Řešení}} Námět k zamyšlení: **Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?** {{ :courses:b6b36pcc:cviceni:zajemce.png?90|Pro zájemce}} ===== Synchronized ===== Trik který jsme použili pro exekuci arbitrární logiky uvnitř ''atomic_counter'' se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou ''synchronized'', která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou. * Vytvořte šablonovou třídu ''synchronized'' * Použijte ji k synchronizaci více-vláknového zápisu na ''std::cout'' {{:courses:b6b36pcc:cviceni:cviceni_11_synchronized.zip|Řešení}} ===== Synchronizace nad více proměnnými ===== Vzájemnou synchronizaci vláken reprezentuje problém večeřících filosofů.((https://cs.wikipedia.org/wiki/Probl%C3%A9m_ob%C4%9Bdvaj%C3%ADc%C3%ADch_filosof%C5%AF)) „Představme si kulatý stůl, na kterém je po obvodu položených 5 talířů. Mezi každými dvěma talíři je jedna čínská hůlka, celkem je jich tedy také 5. Dále si představme, že za tímto stolem sedí pět filozofů, každý za svým talířem. Filozof představuje nějaký proces a může provádět dvě činnosti – buď obědvat (eat), nebo filozofovat (thing). Aby mohl obědvat, musí si vzít dvě čínské hůlky. Pokud chce filozofovat, nedrží ani jednu hůlku.“ Začneme s třídou reprezentující problém večeřících filozofů. #include #include #include #include #include #include #include constexpr int no_of_philosophers = 5; bool locked[no_of_philosophers] = {false, false, false, false, false}; class philosopher { public: philosopher(std::string n, int ll, int rr) : name(std::move(n)), phil(&philosopher::dine, this), lf(ll), rf(rr) { } ~philosopher() { phil.join(); } void print(std::string text, bool is_locked) { std::stringstream ss; ss << name << text << "\t"; if (is_locked) for (int i = 0; i < no_of_philosophers; i++) if (locked[i]) ss << " " << i + 1; ss << std::endl; std::cout << ss.str(); } void dine() { for (int i = 0; i < 10; i++) { think(); eat(); } } void eat() { locked[lf] = true; locked[rf] = true; print(" started eating.", true); static std::uniform_int_distribution dist(1, 6); std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 5)); print(" finished eating.", true); locked[lf] = false; locked[rf] = false; } void think() { static std::uniform_int_distribution dist(1, 6); std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 15)); print(" is thinking.", false); } private: std::string const name; std::thread phil; int lf, rf; std::mt19937 rng{std::random_device{}()}; }; int main() { std::array philosophers {{ { "Aristotle", 0, 1}, { "Platon", 1, 2}, { "Descartes", 2, 3}, { "Kant", 3, 4}, { "Nietzsche", 4, 0}}}; return 0; } Z výpisu programu je zřejmé, že hůlky jsou filozofy používány zcela náhodně. Upravte program tak, aby správně simuloval definovaný problém. Budete potřebovat mutexy pro jednotlivé vidličky a ty pak synchronně zamykat podle toho, který filozof právě večeří. {{ :courses:b6b36pcc:cviceni:cviceni_11_philosophers.zip |Řešení}} ===== Ostatní problémy ===== V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká **race condition** a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (**deadlock**) a **livelocku**. ==== Deadlock ==== Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku: #include #include #include int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); } ==== Livelock ==== Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá __signifikantně__ pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce: #include #include #include #include #include using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); } {{ :courses:b6b36pcc:cviceni:zajemce.png?90|Pro zájemce}} ===== Něco navíc ===== Podobně jako jsme implementovali ''synchronized'', které zaručovalo synchronizovaný přístup k nějakému ''T'', můžeme naimplementovat ''concurrent'', které zaručí __neblokující__ vykonávání kódu nad nějakým ''T''. Specificky, kód se bude vykonávat na vlastním vláknu. {{:courses:b6b36pcc:cviceni:cviceni_11_concurrent.zip|Řešení}}