Table of Contents

Cvičení 11: Vlákna a paralelismus

Průvodce studiem Práce s jedním vláknem bývá pohodlnější, snazší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.

Použití vláken se dá nahrubo rozdělit do 3 kategorií

V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.

Na tomto cvičení začneme s prázdným .cpp souborem.

Počítadlo

Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.

Proč? Co vypíše na vašem počítači?2)

#include <thread>
#include <iostream>
 
int main() {
    int counter = 0;
 
    auto thread_func = [&counter]() {
        for (int i = 0; i < 10000000; ++i) {
            counter++;
            counter--;
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter << '\n';
}

Řešení

Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:

#include <thread>
#include <iostream>
#include <vector>
 
int main() {
    atomic_counter counter;
 
    auto thread_func = [&counter] () {
        while (counter.value() < 100) {
            counter.increment();
        }
    };
 
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(thread_func);
    }
    for (auto& thread : threads) {
        thread.join();
    }
 
    std::cout << counter.value() << '\n';
}

Pravděpodobně vám vypíše 100, tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.

Řešení

Řešení

Fronta

2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.

Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.

#include <queue>
#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>
 
template <typename T>
class blocking_queue {
public:
    void add(T const& e) {
        std::unique_lock<std::mutex> lg(mutex);
        queue.push(e);
    }
 
    T take() {
        std::unique_lock<std::mutex> lg(mutex);
        auto out = std::move(queue.front()); queue.pop();
        return out;
    }
 
private:
    std::queue<T> queue;
    std::mutex mutex;
};
 
int main() {
    using namespace std::chrono_literals;
    blocking_queue<int> queue;
    auto producer = [] (blocking_queue<int>& queue) {
        for (int i = 0; i < 100; ++i) {
            queue.add(i);
            std::this_thread::sleep_for(100ms);
        }
    };
 
    auto consumer = [] (blocking_queue<int>& queue) {
        while (true) {
            auto elem = queue.take();
            std::cout << elem << '\n';
        }
    };
 
    std::thread t1(producer, std::ref(queue));
    std::thread t2(consumer, std::ref(queue));
 
    t1.join();
    t2.join();
}

Úkoly k procvičení Opravte frontu tak, aby se vlákno volající take zablokovalo, pokud je fronta prázdná.

K čekání se vám bude hodit std::condition_variable.

Řešení

Teď již nedochází k chybě pokud se volá take() na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože consumer vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty – do zavřené fronty nelze vkládat další prvky a volání take signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná.

Řešení

Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.

Řešení

Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?

Pro zájemce

Synchronized<T>

Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou synchronized, která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou.

Řešení

Synchronizace nad více proměnnými

Vzájemnou synchronizaci vláken reprezentuje problém večeřících filosofů.4) „Představme si kulatý stůl, na kterém je po obvodu položených 5 talířů. Mezi každými dvěma talíři je jedna čínská hůlka, celkem je jich tedy také 5. Dále si představme, že za tímto stolem sedí pět filozofů, každý za svým talířem. Filozof představuje nějaký proces a může provádět dvě činnosti – buď obědvat (eat), nebo filozofovat (thing). Aby mohl obědvat, musí si vzít dvě čínské hůlky. Pokud chce filozofovat, nedrží ani jednu hůlku.“

Začneme s třídou reprezentující problém večeřících filozofů.

#include <array>
#include <thread>
#include <chrono>
#include <iostream>
#include <string>
#include <random>
#include <sstream>
 
constexpr  int no_of_philosophers = 5;
bool locked[no_of_philosophers] = {false, false, false, false, false};
 
class philosopher {
public:
    philosopher(std::string n, int ll, int rr) :
            name(std::move(n)),
            phil(&philosopher::dine, this),
            lf(ll),
            rf(rr)
    { }
    ~philosopher() { phil.join(); }
    void print(std::string text, bool is_locked) {
        std::stringstream ss;
        ss << name << text << "\t";
        if (is_locked)
            for (int i = 0; i < no_of_philosophers; i++)
                if (locked[i])
                    ss << " " << i + 1;
        ss << std::endl;
        std::cout << ss.str();
    }
    void dine() {
        for (int i = 0; i < 10; i++) {
            think();
            eat();
        }
    }
    void eat() {
        locked[lf] = true; locked[rf] = true;
        print(" started eating.", true);
        static std::uniform_int_distribution<int> dist(1, 6);
        std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 5));
        print(" finished eating.", true);
        locked[lf] = false; locked[rf] = false;
    }
 
    void think() {
        static std::uniform_int_distribution<int> dist(1, 6);
        std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 15));
        print(" is thinking.", false);
    }
private:
    std::string const name;
    std::thread phil;
    int lf, rf;
    std::mt19937 rng{std::random_device{}()};
};
 
int main() {
    std::array<philosopher, no_of_philosophers> philosophers {{
                          { "Aristotle", 0, 1},
                          { "Platon",    1, 2},
                          { "Descartes", 2, 3},
                          { "Kant",      3, 4},
                          { "Nietzsche", 4, 0}}};
    return 0;
}
Z výpisu programu je zřejmé, že hůlky jsou filozofy používány zcela náhodně. Upravte program tak, aby správně simuloval definovaný problém. Budete potřebovat mutexy pro jednotlivé vidličky a ty pak synchronně zamykat podle toho, který filozof právě večeří.

Řešení

Ostatní problémy

V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.

Deadlock

Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:

#include <thread>
#include <mutex>
#include <iostream>
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "Vlakno 1 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "ahoj!\n";
        }
    });
    std::thread t2([&m1, &m2]() {
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "Vlakno 2 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "ahoj!\n";
        }
    });
 
    t1.join();
    t2.join();
}

Livelock

Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:

#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
 
using namespace std::chrono_literals;
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            for (int j = 0; j < 5; ++j) {
                if (m2.try_lock()) {
                    std::cout << "Vlakno 1: " << ++count << '\n';
                    m2.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(2ms);
                }
            }
        }
    });
    std::thread t2([&m1, &m2]() {
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            for (int j = 0; j < 7; ++j) {
                if (m1.try_lock()) {
                    std::cout << "Vlakno 2: " << ++count << '\n';
                    m1.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(3ms);
                }
            }
        }
    });
 
    t1.join();
    t2.join();
}

Pro zájemce

Něco navíc

Podobně jako jsme implementovali synchronized<T>, které zaručovalo synchronizovaný přístup k nějakému T, můžeme naimplementovat concurrent<T>, které zaručí neblokující vykonávání kódu nad nějakým T. Specificky, kód se bude vykonávat na vlastním vláknu.

Řešení

1)
3. kategorie je v zásadě speciální varianta 2. kategorie
2)
Pokud kód zkompilujete v release módu, tj. s optimalizacemi, může se stát, že překladač usoudí, že counter++; counter–; nemá vlastně žádný význam a tento kód vyhodí.
3)
A std::unique_lock