Warning
This page is located in archive. Go to the latest version of this course pages. Go the latest version of this page.

Cvičení 11: Vlákna a paralelismus

Průvodce studiem Práce s jedním vláknem bývá pohodlnější, snazší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.

Použití vláken se dá nahrubo rozdělit do 3 kategorií

  • Rozdělení stejné práce mezi N vláken
  • Rozdělení zpracování dat do N kroků (Jedno vlákno načítá data, další je zpracovává)
  • Zmenšení latence zpracování vstupů (GUI má obvykle separátní vlákno)

V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.

Na tomto cvičení začneme s prázdným .cpp souborem.

Počítadlo

Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.

Proč? Co vypíše na vašem počítači?2)

#include <thread>
#include <iostream>
 
int main() {
    int counter = 0;
 
    auto thread_func = [&counter]() {
        for (int i = 0; i < 10000000; ++i) {
            counter++;
            counter--;
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter << '\n';
}

  • Jak toto opravit pomocí std::mutex3)?
  • Jak toto opravit pomocí std::atomic<int>?
  • Jak toto opravit vlastní třídou atomic_counter?
    • Datovými položkami by měl být vlastní counter a mutex.
    • Implementujte metody int value() a void increment().

Řešení

Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:

#include <thread>
#include <iostream>
#include <vector>
 
int main() {
    atomic_counter counter;
 
    auto thread_func = [&counter] () {
        while (counter.value() < 100) {
            counter.increment();
        }
    };
 
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(thread_func);
    }
    for (auto& thread : threads) {
        thread.join();
    }
 
    std::cout << counter.value() << '\n';
}

Pravděpodobně vám vypíše 100, tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.

  • Přidejte vašemu počítadlu metodu bool increment_if_less_than(int), která tento problém opraví.

Řešení

  • Můžeme nějak vyhodnotit libovolnou podmínku a provést libovolnou akci, aniž bychom pokaždé přidávali novou metodu?

Řešení

Fronta

2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.

Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.

#include <queue>
#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>
 
template <typename T>
class blocking_queue {
public:
    void add(T const& e) {
        std::unique_lock<std::mutex> lg(mutex);
        queue.push(e);
    }
 
    T take() {
        std::unique_lock<std::mutex> lg(mutex);
        auto out = std::move(queue.front()); queue.pop();
        return out;
    }
 
private:
    std::queue<T> queue;
    std::mutex mutex;
};
 
int main() {
    using namespace std::chrono_literals;
    blocking_queue<int> queue;
    auto producer = [] (blocking_queue<int>& queue) {
        for (int i = 0; i < 100; ++i) {
            queue.add(i);
            std::this_thread::sleep_for(100ms);
        }
    };
 
    auto consumer = [] (blocking_queue<int>& queue) {
        while (true) {
            auto elem = queue.take();
            std::cout << elem << '\n';
        }
    };
 
    std::thread t1(producer, std::ref(queue));
    std::thread t2(consumer, std::ref(queue));
 
    t1.join();
    t2.join();
}

Úkoly k procvičení Opravte frontu tak, aby se vlákno volající take zablokovalo, pokud je fronta prázdná.

K čekání se vám bude hodit std::condition_variable.

Řešení

Teď již nedochází k chybě pokud se volá take() na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože consumer vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty – do zavřené fronty nelze vkládat další prvky a volání take signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná.

  • Přidejte frontě proměnnou closing a metodu void close(), která ji nastaví na true a probudí všechna vlákna
  • Zmeňte metodu add tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala
  • Podobným způsobem změňte take
  • Přidejte do funkce producer volání metody close
  • Upravte funkci consumer tak aby se čtecí smyčka ukončila když je fronta zavřená

Řešení

Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.

  • Přidejte frontě konstruktor, který bere maximální velikost
  • Přidejte do add kontrolu velikosti, která případně zablokuje vkládající vlákno
  • Zkontrolujte zda close probudí i vlákna uspaná uvnitř add
  • Přejmenujte frontu na blocking_bounded_queue

Řešení

Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?

Pro zájemce

Synchronized<T>

Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou synchronized, která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou.

  • Vytvořte šablonovou třídu synchronized<T>
  • Použijte ji k synchronizaci více-vláknového zápisu na std::cout

Řešení

Synchronizace nad více proměnnými

Vzájemnou synchronizaci vláken reprezentuje problém večeřících filosofů.4) „Představme si kulatý stůl, na kterém je po obvodu položených 5 talířů. Mezi každými dvěma talíři je jedna čínská hůlka, celkem je jich tedy také 5. Dále si představme, že za tímto stolem sedí pět filozofů, každý za svým talířem. Filozof představuje nějaký proces a může provádět dvě činnosti – buď obědvat (eat), nebo filozofovat (thing). Aby mohl obědvat, musí si vzít dvě čínské hůlky. Pokud chce filozofovat, nedrží ani jednu hůlku.“

Začneme s třídou reprezentující problém večeřících filozofů.

#include <array>
#include <thread>
#include <chrono>
#include <iostream>
#include <string>
#include <random>
#include <sstream>
 
constexpr  int no_of_philosophers = 5;
bool locked[no_of_philosophers] = {false, false, false, false, false};
 
class philosopher {
public:
    philosopher(std::string n, int ll, int rr) :
            name(std::move(n)),
            phil(&philosopher::dine, this),
            lf(ll),
            rf(rr)
    { }
    ~philosopher() { phil.join(); }
    void print(std::string text, bool is_locked) {
        std::stringstream ss;
        ss << name << text << "\t";
        if (is_locked)
            for (int i = 0; i < no_of_philosophers; i++)
                if (locked[i])
                    ss << " " << i + 1;
        ss << std::endl;
        std::cout << ss.str();
    }
    void dine() {
        for (int i = 0; i < 10; i++) {
            think();
            eat();
        }
    }
    void eat() {
        locked[lf] = true; locked[rf] = true;
        print(" started eating.", true);
        static std::uniform_int_distribution<int> dist(1, 6);
        std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 5));
        print(" finished eating.", true);
        locked[lf] = false; locked[rf] = false;
    }
 
    void think() {
        static std::uniform_int_distribution<int> dist(1, 6);
        std::this_thread::sleep_for(std::chrono::microseconds(dist(rng) * 15));
        print(" is thinking.", false);
    }
private:
    std::string const name;
    std::thread phil;
    int lf, rf;
    std::mt19937 rng{std::random_device{}()};
};
 
int main() {
    std::array<philosopher, no_of_philosophers> philosophers {{
                          { "Aristotle", 0, 1},
                          { "Platon",    1, 2},
                          { "Descartes", 2, 3},
                          { "Kant",      3, 4},
                          { "Nietzsche", 4, 0}}};
    return 0;
}
Z výpisu programu je zřejmé, že hůlky jsou filozofy používány zcela náhodně. Upravte program tak, aby správně simuloval definovaný problém. Budete potřebovat mutexy pro jednotlivé vidličky a ty pak synchronně zamykat podle toho, který filozof právě večeří.

Řešení

Ostatní problémy

V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.

Deadlock

Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:

#include <thread>
#include <mutex>
#include <iostream>
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "Vlakno 1 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "ahoj!\n";
        }
    });
    std::thread t2([&m1, &m2]() {
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "Vlakno 2 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "ahoj!\n";
        }
    });
 
    t1.join();
    t2.join();
}

Livelock

Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:

#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
 
using namespace std::chrono_literals;
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            for (int j = 0; j < 5; ++j) {
                if (m2.try_lock()) {
                    std::cout << "Vlakno 1: " << ++count << '\n';
                    m2.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(2ms);
                }
            }
        }
    });
    std::thread t2([&m1, &m2]() {
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            for (int j = 0; j < 7; ++j) {
                if (m1.try_lock()) {
                    std::cout << "Vlakno 2: " << ++count << '\n';
                    m1.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(3ms);
                }
            }
        }
    });
 
    t1.join();
    t2.join();
}

Pro zájemce

Něco navíc

Podobně jako jsme implementovali synchronized<T>, které zaručovalo synchronizovaný přístup k nějakému T, můžeme naimplementovat concurrent<T>, které zaručí neblokující vykonávání kódu nad nějakým T. Specificky, kód se bude vykonávat na vlastním vláknu.

Řešení

1)
3. kategorie je v zásadě speciální varianta 2. kategorie
2)
Pokud kód zkompilujete v release módu, tj. s optimalizacemi, může se stát, že překladač usoudí, že counter++; counter–; nemá vlastně žádný význam a tento kód vyhodí.
3)
A std::unique_lock
courses/b6b36pcc/cviceni/cviceni-11.txt · Last modified: 2022/09/07 14:30 by nagyoing