{{page>courses:b6b36pjc:styles#common&noheader&nofooter}}
{{page>courses:b6b36pjc:styles#cviceni&noheader&nofooter}}
====== Cvičení 11: Vlákna a paralelismus ======
Práce s jedním vláknem bývá pohodlnější, snažší a bezpečnější nežli práce
s více vlákny, ale často je stejně potřeba jich použít víc.
Použití vláken se dá nahrubo rozdělit do 3 kategorií
* Rozdělení stejné práce mezi N vláken
* Rozdělení zpracování dat do N kroků (Jedno vlákno načítá data, další je zpracovává)
* Zmenšení latence zpracování vstupů (GUI má obvykle separátní vlákno)
V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií
((3. kategorie je v zásadě speciální varianta 2. kategorie)) za pomoci
nástrojů z C%%++%% standardní knihovny.
Na tomto cvičení začneme s prázdným ''.cpp'' souborem.
===== Počítadlo =====
Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie.
Tento kód by měl logicky vypsat 0, ale často to tak není.
**Proč? Co vypíše na vašem počítači?**((Pokud kód zkompilujete v release módu, tj. s optimalizacemi, může se stát, že překladač usoudí, že ''counter%%++%%; counter--;'' nemá vlastně žádný význam a tento kód vyhodí.))
#include
#include
int main() {
int counter = 0;
auto thread_func = [&counter]() {
for (int i = 0; i < 10000000; ++i) {
counter++;
counter--;
}
};
std::thread t1(thread_func);
std::thread t2(thread_func);
t1.join();
t2.join();
std::cout << counter << '\n';
}
* Jak toto opravit pomocí ''std::mutex''((A ''std::unique_lock''))?
* Jak toto opravit pomocí ''std::atomic''?
* Jak toto opravit vlastní třídou ''atomic_counter''?
* Datovými položkami by měl být vlastní counter a mutex.
* Implementujte metody ''int value()'' a ''void increment()''.
{{:courses:b6b36pjc:cviceni:cviceni_11_counter.zip|Řešení}}
Další častý problém je, že i pokud jsou jednotlivé kroky samostatně
atomické, nemusíme z nich být schopni postavit kompletní algoritmus.
Zkuste si spustit tento útržek:
#include
#include
#include
int main() {
atomic_counter counter;
auto thread_func = [&counter] () {
while (counter.value() < 100) {
counter.increment();
}
};
std::vector threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(thread_func);
}
for (auto& thread : threads) {
thread.join();
}
std::cout << counter.value() << '\n';
}
Pravděpodobně vám vypíše ''100'', tak jak byste čekali. Toto krásně
ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.
* **Přidejte vašemu počítadlu metodu ''bool increment_if_less_than(int)'', která tento problém opraví.**
* {{:courses:b6b36pjc:cviceni:cviceni_11_counter2.zip|Řešení}}
* **Můžeme nějak vyhodnotit libovolnou podmínku a provést libovolnou akci, aniž bychom pokaždé přidávali novou metodu?**
* {{:courses:b6b36pjc:cviceni:cviceni_11_counter3.zip|Řešení}}
===== Fronta =====
2. kategorie vícevláknových kódů je postavena na frontách. Protože C%%++%%
standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu
na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To
znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli
kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty
není nějaký prvek vložen.
Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná,
tak dojde k chybě.
#include
#include
#include
#include
#include
template
class blocking_queue {
public:
void add(T const& e) {
std::unique_lock lg(mutex);
queue.push(e);
}
T take() {
std::unique_lock lg(mutex);
auto out = std::move(queue.front()); queue.pop();
return out;
}
private:
std::queue queue;
std::mutex mutex;
};
int main() {
using namespace std::chrono_literals;
blocking_queue queue;
auto producer = [] (blocking_queue& queue) {
for (int i = 0; i < 100; ++i) {
queue.add(i);
std::this_thread::sleep_for(100ms);
}
};
auto consumer = [] (blocking_queue& queue) {
while (true) {
auto elem = queue.take();
std::cout << elem << '\n';
}
};
std::thread t1(producer, std::ref(queue));
std::thread t2(consumer, std::ref(queue));
t1.join();
t2.join();
}
* **Opravte frontu tak, aby se vlákno volající ''take'' zablokovalo, pokud je fronta prázdná.**
K čekání se vám bude hodit
[[https://en.cppreference.com/w/cpp/thread/condition_variable|std::condition_variable]].
{{:courses:b6b36pjc:cviceni:cviceni_11_queue-1.zip|Řešení}}
Teď již nedochází k chybě pokud se volá ''take()'' na prázdné frontě, ale
dochází k jinému problému: program se nikdy neukončí, protože ''consumer''
vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání
fronty -- do zavřené fronty nelze vkládat další prvky a volání ''take''
signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a
prázdná.
* Přidejte frontě proměnnou ''closing'' a metodu ''void close()'', která ji nastaví na ''true'' a probudí všechna vlákna
* Zmeňte metodu ''add'' tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala
* Podobným způsobem změňte ''take''
* Přidejte do funkce ''producer'' volání metody ''close''
* Upravte funkci ''consumer'' tak aby se čtecí smyčka ukončila když je fronta zavřená
{{:courses:b6b36pjc:cviceni:cviceni_11_queue-2.zip|Řešení}}
Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic
získat, takže náš program časem doběhne. Stále ale ještě budeme chtít
udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš
veliká -- pokud by se třeba zasekl konzument, nechceme aby producent
přidával nové prvky do nekonečna.
* Přidejte frontě konstruktor, který bere maximální velikost
* Přidejte do ''add'' kontrolu velikosti, která případně zablokuje vkládající vlákno
* Zkontrolujte zda ''close'' probudí i vlákna uspaná uvnitř ''add''
* Přejmenujte frontu na ''blocking_bounded_queue''
{{:courses:b6b36pjc:cviceni:cviceni_11_queue-3.zip|Řešení}}
Námět k zamyšlení: **Pokud bude do naší fronty zapisovat více vláken a číst
z ní bude též více vláken, dojde k chybě?**
===== Synchronized =====
Trik který jsme použili pro exekuci arbitrární logiky uvnitř ''atomic_counter''
se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit
třídu pojmenovanou ''synchronized'', která nám umožní synchronizovat
arbitrární exekuci nad libovolnou třídou.
* Vytvořte šablonovou třídu ''synchronized''
* Použijte ji k synchronizaci více-vláknového zápisu na ''std::cout''
{{:courses:b6b36pjc:cviceni:cviceni_11_synchronized.zip|Řešení}}
===== Ostatní problémy =====
V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali
zápis do stejné paměti. Tomuto problému se říká __race condition__ a patří
mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny,
by ještě měl vědět o tzv. uváznutí (__deadlock__) a __livelocku__.
Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou.
Dochází k němu když na sebe vzájemně čeká několik vláken, jako například
v tomto útržku:
#include
#include
#include
int main() {
std::mutex m1;
std::mutex m2;
std::thread t1([&m1, &m2](){
for (int i = 0; i < 1000000; ++i) {
std::unique_lock l1(m1);
std::cout << "Vlakno 1 rika: ";
std::cout.flush(); // vynutí výpis
std::unique_lock l2(m2);
std::cout << "ahoj!\n";
}
});
std::thread t2([&m1, &m2]() {
for (int i = 0; i < 1000000; ++i) {
std::unique_lock l2(m2);
std::cout << "Vlakno 2 rika: ";
std::cout.flush(); // vynutí výpis
std::unique_lock l1(m1);
std::cout << "ahoj!\n";
}
});
t1.join();
t2.join();
}
Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice
postupují, ale program pobíhá __signifikantně__ pomaleji, nežli by probíhal
v jednom vláknu. K livelocku dochází například pokud se více vláken snaží
zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:
#include
#include
#include
#include
#include
using namespace std::chrono_literals;
int main() {
std::mutex m1;
std::mutex m2;
std::thread t1([&m1, &m2](){
int count = 0;
for (int i = 0; i < 1000000; ++i) {
std::unique_lock l1(m1);
for (int j = 0; j < 5; ++j) {
if (m2.try_lock()) {
std::cout << "Vlakno 1: " << ++count << '\n';
m2.unlock();
break;
}
else {
std::this_thread::sleep_for(2ms);
}
}
}
});
std::thread t2([&m1, &m2]() {
int count = 0;
for (int i = 0; i < 1000000; ++i) {
std::unique_lock l2(m2);
for (int j = 0; j < 7; ++j) {
if (m1.try_lock()) {
std::cout << "Vlakno 2: " << ++count << '\n';
m1.unlock();
break;
}
else {
std::this_thread::sleep_for(3ms);
}
}
}
});
t1.join();
t2.join();
}
===== Extra =====
Podobně jako jsme implementovali ''synchronized'', které zaručovalo
synchronizovaný přístup k nějakému ''T'', můžeme naimplementovat
''concurrent'', které zaručí __neblokující__ vykonávání kódu nad
nějakým ''T''. Specificky, kód se bude vykonávat na vlastním vláknu.
{{:courses:b6b36pjc:cviceni:cviceni_11_concurrent.zip|Řešení}}