===== Použití vláken ===== Dnes si ukážeme, jak (potenciálně) zrychlit naše programy pomocí vláken. Pokud má náš stroj více procesorů, vlákna nám umožní vykonávat více práce najednou. Na dnešním cvičení začneme s prázdným ''.cpp'' souborem. * Vytvořte funkci, která 10000-krát vypíše ''"Ahoj, XXXXX! Jak se mas?\n"'' na standardní výstup. * Vytvořte funkci, která 10000-krát vypíše ''"Ahoj, OOOOO! Jak se mas?\n"'' na standardní výstup. * Spusťte obě funkce zároveň pomocí objektu ''std::thread'', který v konstruktoru očekává funkci, kterou má provést. * Až budou obě funkce spuštěny, počkejte, až se obě dokončí pomocí metody ''.join()''. Pokud je objekt typu ''std::thread'' zničen bez toho, aby se na něm zavolalo buďto ''.join()'' (čekej na dokončení) nebo ''.detach()'' (nečekej na dokončení), ''std::thread'' je krajně nespokojený -- shodí nám program. Podobně jako jinde v STL je možné vložit do konstruktoru ''std::thread'' cokoliv, co připomíná funkci -- např. funkční objekt nebo lambdu. Funkce, která je spuštěna ve vlastním vlákně, může mít parametry. Druhý a další parametr konstruktoru ''std::thread'' slouží k tomu, aby tyto parametry předal. * Upravte funkce, aby dostávaly parametrem řetězec, který použije jako osloveni místo ''"XXXXX"'' a ''"OOOOO"''. * Funkce sjednoťte do jedné lambda funkce, abyste omezili míru opakování kódu. Vyzkoušejte si uložit lambda funkci do proměnné -- typ lambdy nelze napsat, takže je nutno použít ''auto''. {{:courses:a7b36pjc:cviceni:cviceni_11_thread.zip|Řešení}} Co se teď stane, když spustíme program? Vidíme, že některé výpisy jsou teď navzájem pomíchané. Důvodem je, že teď každá iterace v každém vlákně použije třikrát výpis na standardní výstup, a tyto tři použití se mohou navzájem libovolně střídat. * Vytvořte objekt typu ''std::mutex'', který budou obě vlákna sdílet, ale pouze jedno vlákno může mutex zamknout -- druhé vlákno musí počkat. * Zpřístupněte objekt typu ''std::mutex'' funkci vlákna. Do //capture listu// lambda funkce přidejte referenci na mutex, nebo pošlete referenci na mutex dalším parametrem. * V každé iteraci před výpisem zamkněte mutex pomocí objektu typu ''std::unique_lock''. V destruktoru ''std::unique_lock'' se mutex sám odemkne. Nyní se nám výpisy přestanou prolínat, protože druhé vlákno nesmí začít vypisovat, dokud první vlákno nedokončí řádek. {{:courses:a7b36pjc:cviceni:cviceni_11_mutex.zip|Řešení}} ===== Uváznutí (deadlock) ===== Zkusme si pustit následující program: #include #include #include int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ for (int i = 0; i < 1000000; ++i) { std::unique_lock l1(m1); std::cout << "Vlakno 1 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock l2(m2); std::cout << "ahoj!\n"; } }); std::thread t2([&m1, &m2]() { for (int i = 0; i < 1000000; ++i) { std::unique_lock l2(m2); std::cout << "Vlakno 2 rika: "; std::cout.flush(); // vynutí výpis std::unique_lock l1(m1); std::cout << "ahoj!\n"; } }); t1.join(); t2.join(); } Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit? ===== Livelock ===== Zkusme si pustit následující program: #include #include #include #include #include using namespace std::chrono_literals; int main() { std::mutex m1; std::mutex m2; std::thread t1([&m1, &m2](){ int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock l1(m1); for (int j = 0; j < 5; ++j) { if (m2.try_lock()) { std::cout << "Vlakno 1: " << ++count << '\n'; m2.unlock(); break; } else { std::this_thread::sleep_for(2ms); } } } }); std::thread t2([&m1, &m2]() { int count = 0; for (int i = 0; i < 1000000; ++i) { std::unique_lock l2(m2); for (int j = 0; j < 7; ++j) { if (m1.try_lock()) { std::cout << "Vlakno 2: " << ++count << '\n'; m1.unlock(); break; } else { std::this_thread::sleep_for(3ms); } } } }); t1.join(); t2.join(); } Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit? ===== Race condition ===== Zkusme si pustit následující program: #include #include int main() { int counter = 0; auto thread_func = [&counter]() { for (int i = 0; i < 10000000; ++i) { counter++; counter--; } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter << '\n'; } Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit? * Vyřešte situaci pomocí zamykání ''std::mutex''. * Vyřešte situaci náhradou typu proměnné ''counter'' za ''std::atomic''. * Vyřešte situaci vlastní třídou ''atomic_counter'', která má metody ''.value()'', ''.increment()'' a ''.decrement()'' a která obsahuje ''int'' a ''std::mutex''. {{:courses:a7b36pjc:cviceni:cviceni_11_counter.zip|Řešení}} Vyzkoušejte třídu ''atomic_counter'' na této funkci ''main'': int main() { atomic_counter counter; auto thread_func = [&counter]() { while (counter.value() < 5) { counter.increment(); } }; std::thread t1(thread_func); std::thread t2(thread_func); t1.join(); t2.join(); std::cout << counter.value() << '\n'; } Co je špatně s touto funkcí ''main''? Co s tím? * Přidejte do ''atomic_counter'' metodu ''.increment_if_less_than(int)''. Vidíte nějakou vadu v našem řešení problému? Představte si, že chceme, aby vlákna provedla nějakou akci přesně 5-krát. Co s tím? * Přidejte metodě ''increment_if_less_than'' návratovou hodnotu typu ''bool'', která nám sdělí, zda se inkrementace povedla. {{:courses:a7b36pjc:cviceni:cviceni_11_counter2.zip|Řešení}} Asi tušíte, že pořád naše řešení není příliš obecné. Vždy, když budeme chtít, aby počítadlo umělo něco nového, zjistíme, že budeme potřebovat novou metodu. Co když budeme chtít přičítat víc, než 1? Co když budeme chtít odčítat? Co když budeme chtít něco provést, pouze pokud je hodnota počítadla dělitelná třemi? Možnosti jsou různorodé a každé použití by si vyžádalo novou metodu. * Najděte způsob, jak vyhodnotit v ''atomic_counter'' libovolnou podmínku a v závislosti na úspěchu provést libovolnou akci. {{:courses:a7b36pjc:cviceni:cviceni_11_counter3.zip|Řešení}} ===== Použití vláken ===== Podíváme se na použití vláken pro složitější úlohy. Většina vláknových problémů se dá napasovat na dobře známé a řešené problémy. My se podíváme na implementaci [[https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem|producent-consumer]] problému, kde alespoň dvě vlákna sdílejí frontu s omezenou kapacitou. Náš producent bude vytvářet čísla - inty a náš spotřebitel je bude z fronty konzumovat. Pokud chceme, můžeme si představit tato čísla jako indexy do jiné, nesynchronizované kolekce (Alternativně můžeme později nasadit šablony). Základní schéma je jednoduché class blocking_bounded_queue { void add(const ham& h); const ham& take(const ham& h); }; blocking_bounded_queue shared_queue; void producer() { while (true) { auto ham = produce_ham(); shared_queue.add(ham); } } void consumer() { while (true) { auto ham = shared_queue.take(); eat_ham(ham); } } ale tímto jsme pouze přesunuli složitost dovnitř ''blocking_bounded_queue'', do funkcí ''add'' a ''take''. Takže, jak naimplementovat ''blocking_bounded_queue''? * Určitě do objektu nesmíme pustit více vláken zároveň. * Určitě musíme počítat kapacitu * Určitě musíme počítat kolik máme prvků * Musíme mít kam prvky ukládat Dále máme i několik méně očividných požadavků, například chceme, aby volání funkce ''take'' tzv. blokovalo (nedojde k návratu z funkce) dokud nelze z fronty něco vybrat a volání funkce ''add'' blokovalo dokud nelze do fronty prvek přidat. To vyplývá ze struktury producenta a spotřebitele, kteří předpokládají, že tyto funkce prostě proběhnou. To znamená, že funkce ''take'' a ''add'' musejí mít na čem čekat (například ''std::condition_variable''). Zde je kostra ''blocking_bounded_queue'', splňující první 4 podmínky. Jak doimplementujete funkce ''add'' a ''take'', aby splňovala i poslední podmínku? #include #include class blocking_bounded_queue { public: void add(int i) { std::unique_lock h {m}; // zabrani vstupu vice vlaken if (q.size() < capacity){ q.push(i); } else { // ? } } int take() { std::unique_lock h {m}; // zabrani vstupu vice vlaken if (q.size() > 0){ int temp = q.front(); q.pop(); return temp; } else { // ? } } private: std::queue queue; std::size_t size_limit; std::mutex mutex; }; {{:courses:a7b36pjc:cviceni:cviceni_11_bbq.zip|Řešení}} ===== Monitor ===== Trik, který jsme použili abychom umožnili provést bezpečně libovolnou akci nad počítadlem, se dá generalizovat pro arbitrární typ. Takové šabloně se občas říká ''monitor''. Jak se taková šablona implementuje? {{:courses:a7b36pjc:cviceni:cviceni_11_monitor.zip|Řešení}}