{{page>courses:b6b36pjc:styles#common&noheader&nofooter}} {{page>courses:b6b36pjc:styles#cviceni&noheader&nofooter}} ====== Cvičení 11: Vlákna a paralelismus ====== Práce s jedním vláknem bývá pohodlnější, snažší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc. Použití vláken se dá nahrubo rozdělit do 3 kategorií * Rozdělení stejné práce mezi N vláken * Rozdělení zpracování dat do N kroků (Jedno vlákno načítá data, další je zpracovává) * Zmenšení latence zpracování vstupů (GUI má obvykle separátní vlákno) V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií ((3. kategorie je v zásadě speciální varianta 2. kategorie)) za pomoci nástrojů z C%%++%% standardní knihovny. Na tomto cvičení začneme s prázdným ''.cpp'' souborem. ===== Počítadlo ===== Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není. **Proč? Co vypíše na vašem počítači?**((Pokud kód zkompilujete v release módu, tj. s optimalizacemi, může se stát, že překladač usoudí, že ''counter%%++%%; counter--;'' nemá vlastně žádný význam a tento kód vyhodí.)) #include #include int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; } * Jak toto opravit pomocí ''std::mutex''((A ''std::unique_lock''))? * Jak toto opravit pomocí ''std::atomic''? * Jak toto opravit vlastní třídou ''atomic_counter''? * Datovými položkami by měl být vlastní counter a mutex. * Implementujte metody ''int value()'' a ''void increment()''. {{:courses:b6b36pjc:cviceni:cviceni_11_counter.zip|Řešení}} Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek: #include #include #include int main() { atomic_counter counter; auto thread_func = [&counter] () { while (counter.value() < 100) { counter.increment(); } }; std::vector threads; for (int i = 0; i < 4; ++i) { threads.emplace_back(thread_func); } for (auto& thread : threads) { thread.join(); } std::cout << counter.value() << '\n'; } Pravděpodobně vám vypíše ''100'', tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy. * **Přidejte vašemu počítadlu metodu ''bool increment_if_less_than(int)'', která tento problém opraví.** * {{:courses:b6b36pjc:cviceni:cviceni_11_counter2.zip|Řešení}} * **Můžeme nějak vyhodnotit libovolnou podmínku a provést libovolnou akci, aniž bychom pokaždé přidávali novou metodu?** * {{:courses:b6b36pjc:cviceni:cviceni_11_counter3.zip|Řešení}} ===== Fronta ===== 2. kategorie vícevláknových kódů je postavena na frontách. Protože C%%++%% standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen. Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě. #include #include #include #include #include template class blocking_queue { public: void add(T const& e) { std::unique_lock lg(mutex); queue.push(e); } T take() { std::unique_lock lg(mutex); auto out = std::move(queue.front()); queue.pop(); return out; } private: std::queue queue; std::mutex mutex; }; int main() { using namespace std::chrono_literals; blocking_queue queue; auto producer = [] (blocking_queue& queue) { for (int i = 0; i < 100; ++i) { queue.add(i); std::this_thread::sleep_for(100ms); } }; auto consumer = [] (blocking_queue& queue) { while (true) { auto elem = queue.take(); std::cout << elem << '\n'; } }; std::thread t1(producer, std::ref(queue)); std::thread t2(consumer, std::ref(queue)); t1.join(); t2.join(); } * **Opravte frontu tak, aby se vlákno volající ''take'' zablokovalo, pokud je fronta prázdná.** K čekání se vám bude hodit [[https://en.cppreference.com/w/cpp/thread/condition_variable|std::condition_variable]]. {{:courses:b6b36pjc:cviceni:cviceni_11_queue-1.zip|Řešení}} Teď již nedochází k chybě pokud se volá ''take()'' na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože ''consumer'' vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty -- do zavřené fronty nelze vkládat další prvky a volání ''take'' signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná. * Přidejte frontě proměnnou ''closing'' a metodu ''void close()'', která ji nastaví na ''true'' a probudí všechna vlákna * Zmeňte metodu ''add'' tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala * Podobným způsobem změňte ''take'' * Přidejte do funkce ''producer'' volání metody ''close'' * Upravte funkci ''consumer'' tak aby se čtecí smyčka ukončila když je fronta zavřená {{:courses:b6b36pjc:cviceni:cviceni_11_queue-2.zip|Řešení}} Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká -- pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna. * Přidejte frontě konstruktor, který bere maximální velikost * Přidejte do ''add'' kontrolu velikosti, která případně zablokuje vkládající vlákno * Zkontrolujte zda ''close'' probudí i vlákna uspaná uvnitř ''add'' * Přejmenujte frontu na ''blocking_bounded_queue'' {{:courses:b6b36pjc:cviceni:cviceni_11_queue-3.zip|Řešení}} Námět k zamyšlení: **Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?** ===== Synchronized ===== Trik který jsme použili pro exekuci arbitrární logiky uvnitř ''atomic_counter'' se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou ''synchronized'', která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou. * Vytvořte šablonovou třídu ''synchronized'' * Použijte ji k synchronizaci více-vláknového zápisu na ''std::cout'' {{:courses:b6b36pjc:cviceni:cviceni_11_synchronized.zip|Řešení}} ===== Ostatní problémy ===== V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká __race condition__ a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (__deadlock__) a __livelocku__. Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku: #include #include #include int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); } Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá __signifikantně__ pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce: #include #include #include #include #include using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); } ===== Extra ===== Podobně jako jsme implementovali ''synchronized'', které zaručovalo synchronizovaný přístup k nějakému ''T'', můžeme naimplementovat ''concurrent'', které zaručí __neblokující__ vykonávání kódu nad nějakým ''T''. Specificky, kód se bude vykonávat na vlastním vláknu. {{:courses:b6b36pjc:cviceni:cviceni_11_concurrent.zip|Řešení}}