Table of Contents

Použití vláken

Dnes si ukážeme, jak (potenciálně) zrychlit naše programy pomocí vláken. Pokud má náš stroj více procesorů, vlákna nám umožní vykonávat více práce najednou. Na dnešním cvičení začneme s prázdným .cpp souborem.

Pokud je objekt typu std::thread zničen bez toho, aby se na něm zavolalo buďto .join() (čekej na dokončení) nebo .detach() (nečekej na dokončení), std::thread je krajně nespokojený – shodí nám program. Podobně jako jinde v STL je možné vložit do konstruktoru std::thread cokoliv, co připomíná funkci – např. funkční objekt nebo lambdu.

Funkce, která je spuštěna ve vlastním vlákně, může mít parametry. Druhý a další parametr konstruktoru std::thread slouží k tomu, aby tyto parametry předal.

Řešení

Co se teď stane, když spustíme program? Vidíme, že některé výpisy jsou teď navzájem pomíchané. Důvodem je, že teď každá iterace v každém vlákně použije třikrát výpis na standardní výstup, a tyto tři použití se mohou navzájem libovolně střídat.

Nyní se nám výpisy přestanou prolínat, protože druhé vlákno nesmí začít vypisovat, dokud první vlákno nedokončí řádek.

Řešení

Uváznutí (deadlock)

Zkusme si pustit následující program:

#include <thread>
#include <mutex>
#include <iostream>
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "Vlakno 1 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "ahoj!\n";
        }
    });
    std::thread t2([&m1, &m2]() {
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "Vlakno 2 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "ahoj!\n";
        }
    });
 
    t1.join();
    t2.join();
}

Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?

Livelock

Zkusme si pustit následující program:

#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
 
using namespace std::chrono_literals;
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            for (int j = 0; j < 5; ++j) {
                if (m2.try_lock()) {
                    std::cout << "Vlakno 1: " << ++count << '\n';
                    m2.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(2ms);
                }
            }
        }
    });
    std::thread t2([&m1, &m2]() {
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            for (int j = 0; j < 7; ++j) {
                if (m1.try_lock()) {
                    std::cout << "Vlakno 2: " << ++count << '\n';
                    m1.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(3ms);
                }
            }
        }
    });
 
    t1.join();
    t2.join();
}

Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?

Race condition

Zkusme si pustit následující program:

#include <thread>
#include <iostream>
 
int main() {
    int counter = 0;
 
    auto thread_func = [&counter]() {
        for (int i = 0; i < 10000000; ++i) {
            counter++;
            counter--;
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter << '\n';
}

Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?

Řešení

Vyzkoušejte třídu atomic_counter na této funkci main:

int main() {
    atomic_counter counter;
 
    auto thread_func = [&counter]() {
        while (counter.value() < 5) {
            counter.increment();
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter.value() << '\n';
}

Co je špatně s touto funkcí main? Co s tím?

Vidíte nějakou vadu v našem řešení problému? Představte si, že chceme, aby vlákna provedla nějakou akci přesně 5-krát. Co s tím?

Řešení

Asi tušíte, že pořád naše řešení není příliš obecné. Vždy, když budeme chtít, aby počítadlo umělo něco nového, zjistíme, že budeme potřebovat novou metodu. Co když budeme chtít přičítat víc, než 1? Co když budeme chtít odčítat? Co když budeme chtít něco provést, pouze pokud je hodnota počítadla dělitelná třemi? Možnosti jsou různorodé a každé použití by si vyžádalo novou metodu.

Řešení

Použití vláken

Podíváme se na použití vláken pro složitější úlohy. Většina vláknových problémů se dá napasovat na dobře známé a řešené problémy. My se podíváme na implementaci producent-consumer problému, kde alespoň dvě vlákna sdílejí frontu s omezenou kapacitou.

Náš producent bude vytvářet čísla - inty a náš spotřebitel je bude z fronty konzumovat. Pokud chceme, můžeme si představit tato čísla jako indexy do jiné, nesynchronizované kolekce (Alternativně můžeme později nasadit šablony).

Základní schéma je jednoduché

class blocking_bounded_queue {
    void add(const ham& h);
    const ham& take(const ham& h);
};
 
blocking_bounded_queue shared_queue;
 
void producer() {
    while (true) {
        auto ham = produce_ham();
        shared_queue.add(ham);
    }
}
 
void consumer() {
    while (true) {
        auto ham = shared_queue.take();
        eat_ham(ham);
    }
}
ale tímto jsme pouze přesunuli složitost dovnitř blocking_bounded_queue, do funkcí add a take. Takže, jak naimplementovat blocking_bounded_queue?

Dále máme i několik méně očividných požadavků, například chceme, aby volání funkce take tzv. blokovalo (nedojde k návratu z funkce) dokud nelze z fronty něco vybrat a volání funkce add blokovalo dokud nelze do fronty prvek přidat. To vyplývá ze struktury producenta a spotřebitele, kteří předpokládají, že tyto funkce prostě proběhnou. To znamená, že funkce take a add musejí mít na čem čekat (například std::condition_variable).

Zde je kostra blocking_bounded_queue, splňující první 4 podmínky. Jak doimplementujete funkce add a take, aby splňovala i poslední podmínku?

#include <mutex>
#include <queue>
class blocking_bounded_queue {
public:
    void add(int i) {
        std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken
        if (q.size() < capacity){
            q.push(i);
        } else {
            // ?
        }
    }
 
    int take() {
        std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken
        if (q.size() > 0){
            int temp = q.front(); q.pop();
            return temp;
        } else {
            // ?
        }
    }
 
private:
    std::queue<int> queue;
    std::size_t size_limit;
    std::mutex mutex;
};

Řešení

Monitor

Trik, který jsme použili abychom umožnili provést bezpečně libovolnou akci nad počítadlem, se dá generalizovat pro arbitrární typ. Takové šabloně se občas říká monitor.

Jak se taková šablona implementuje?

Řešení