Cvičení 11: Vlákna a paralelismus

Práce s jedním vláknem bývá pohodlnější, snažší a bezpečnější nežli práce s více vlákny, ale často je stejně potřeba jich použít víc.

Použití vláken se dá nahrubo rozdělit do 3 kategorií

  • Rozdělení stejné práce mezi N vláken
  • Rozdělení zpracování dat do N kroků (Jedno vlákno načítá data, další je zpracovává)
  • Zmenšení latence zpracování vstupů (GUI má obvykle separátní vlákno)

V tomto My si vyzkoušíme jednoduché varianty prvních dvou kategorií 1) za pomoci nástrojů z C++ standardní knihovny.

Na tomto cvičení začneme s prázdným .cpp souborem.

Počítadlo

Začneme s jednoduchým příkladem, počítadlem, jako zástupcem 1. kategorie. Tento kód by měl logicky vypsat 0, ale často to tak není.

Proč? Co vypíše na vašem počítači?2)

#include <thread>
#include <iostream>
 
int main() {
    int counter = 0;
 
    auto thread_func = [&counter]() {
        for (int i = 0; i < 10000000; ++i) {
            counter++;
            counter--;
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter << '\n';
}

  • Jak toto opravit pomocí std::mutex3)?
  • Jak toto opravit pomocí std::atomic<int>?
  • Jak toto opravit vlastní třídou atomic_counter?
    • Datovými položkami by měl být vlastní counter a mutex.
    • Implementujte metody int value() a void increment().

Řešení

Další častý problém je, že i pokud jsou jednotlivé kroky samostatně atomické, nemusíme z nich být schopni postavit kompletní algoritmus. Zkuste si spustit tento útržek:

#include <thread>
#include <iostream>
#include <vector>
 
int main() {
    atomic_counter counter;
 
    auto thread_func = [&counter] () {
        while (counter.value() < 100) {
            counter.increment();
        }
    };
 
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(thread_func);
    }
    for (auto& thread : threads) {
        thread.join();
    }
 
    std::cout << counter.value() << '\n';
}

Pravděpodobně vám vypíše 100, tak jak byste čekali. Toto krásně ilustruje problém s vlákny, protože tento výsledek nemusíte dostat vždy.

  • Přidejte vašemu počítadlu metodu bool increment_if_less_than(int), která tento problém opraví.
  • Můžeme nějak vyhodnotit libovolnou podmínku a provést libovolnou akci, aniž bychom pokaždé přidávali novou metodu?

Fronta

2. kategorie vícevláknových kódů je postavena na frontách. Protože C++ standardní knihovna neposkytuje více-vláknovou frontu, postavíme si jednu na cvičení. Fronta, kterou budeme programovat, je tzv. blokující. To znamená, že pokud se z ní nějaké vlákno pokusí vytáhnout prvek ve chvíli kdy je prázdná, tak danému vláknu bude přerušena exekuce dokud do fronty není nějaký prvek vložen.

Začneme s kostrou fronty, která sice je thread-safe, ale pokud je prázdná, tak dojde k chybě.

#include <queue>
#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>
 
template <typename T>
class blocking_queue {
public:
    void add(T const& e) {
        std::unique_lock<std::mutex> lg(mutex);
        queue.push(e);
    }
 
    T take() {
        std::unique_lock<std::mutex> lg(mutex);
        auto out = std::move(queue.front()); queue.pop();
        return out;
    }
 
private:
    std::queue<T> queue;
    std::mutex mutex;
};
 
int main() {
    using namespace std::chrono_literals;
    blocking_queue<int> queue;
    auto producer = [] (blocking_queue<int>& queue) {
        for (int i = 0; i < 100; ++i) {
            queue.add(i);
            std::this_thread::sleep_for(100ms);
        }
    };
 
    auto consumer = [] (blocking_queue<int>& queue) {
        while (true) {
            auto elem = queue.take();
            std::cout << elem << '\n';
        }
    };
 
    std::thread t1(producer, std::ref(queue));
    std::thread t2(consumer, std::ref(queue));
 
    t1.join();
    t2.join();
}

  • Opravte frontu tak, aby se vlákno volající take zablokovalo, pokud je fronta prázdná.

K čekání se vám bude hodit std::condition_variable.

Řešení

Teď již nedochází k chybě pokud se volá take() na prázdné frontě, ale dochází k jinému problému: program se nikdy neukončí, protože consumer vlákno nikdy nedoběhne. Vyřešíme to tak, že zavedeme koncept zavírání fronty – do zavřené fronty nelze vkládat další prvky a volání take signalizuje jestli byl získán prvek, nebo jestli je fronta zavřená a prázdná.

  • Přidejte frontě proměnnou closing a metodu void close(), která ji nastaví na true a probudí všechna vlákna
  • Zmeňte metodu add tak aby signalizovala jestli ke vložení opravdu došlo, a v případě zavřené fronty prvek nevkládala
  • Podobným způsobem změňte take
  • Přidejte do funkce producer volání metody close
  • Upravte funkci consumer tak aby se čtecí smyčka ukončila když je fronta zavřená

Řešení

Naše nová fronta teď umí signalizovat konzumentům kdy už z ní nejde nic získat, takže náš program časem doběhne. Stále ale ještě budeme chtít udělat jednu změnu, specificky budeme chtít, aby fronta nemohla být příliš veliká – pokud by se třeba zasekl konzument, nechceme aby producent přidával nové prvky do nekonečna.

  • Přidejte frontě konstruktor, který bere maximální velikost
  • Přidejte do add kontrolu velikosti, která případně zablokuje vkládající vlákno
  • Zkontrolujte zda close probudí i vlákna uspaná uvnitř add
  • Přejmenujte frontu na blocking_bounded_queue

Řešení

Námět k zamyšlení: Pokud bude do naší fronty zapisovat více vláken a číst z ní bude též více vláken, dojde k chybě?

Synchronized<T>

Trik který jsme použili pro exekuci arbitrární logiky uvnitř atomic_counter se dá zobecnit do šablony na arbitrárním typem. Zkusíme si tedy teď vytvořit třídu pojmenovanou synchronized, která nám umožní synchronizovat arbitrární exekuci nad libovolnou třídou.

  • Vytvořte šablonovou třídu synchronized<T>
  • Použijte ji k synchronizaci více-vláknového zápisu na std::cout

Řešení

Ostatní problémy

V prvním příkladu jsme měli problém, kdy jsme nedostatečně synchronizovali zápis do stejné paměti. Tomuto problému se říká race condition a patří mezi časté potíže při použití více vláken. Každý kdo pracuje s více vlákny, by ještě měl vědět o tzv. uváznutí (deadlock) a livelocku.

Deadlock znamená, že program nepostupuje a vlákna nikdy nedoběhnou. Dochází k němu když na sebe vzájemně čeká několik vláken, jako například v tomto útržku:

#include <thread>
#include <mutex>
#include <iostream>
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "Vlakno 1 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "ahoj!\n";
        }
    });
    std::thread t2([&m1, &m2]() {
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "Vlakno 2 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "ahoj!\n";
        }
    });
 
    t1.join();
    t2.join();
}

Na Livelock se můžeme dívat jako na slabší verzi uváznutí. Vlákna sice postupují, ale program pobíhá signifikantně pomaleji, nežli by probíhal v jednom vláknu. K livelocku dochází například pokud se více vláken snaží zamknout stejné mutexy, ale po čase je zase odemkne, jako například v ukázce:

#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
 
using namespace std::chrono_literals;
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            for (int j = 0; j < 5; ++j) {
                if (m2.try_lock()) {
                    std::cout << "Vlakno 1: " << ++count << '\n';
                    m2.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(2ms);
                }
            }
        }
    });
    std::thread t2([&m1, &m2]() {
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            for (int j = 0; j < 7; ++j) {
                if (m1.try_lock()) {
                    std::cout << "Vlakno 2: " << ++count << '\n';
                    m1.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(3ms);
                }
            }
        }
    });
 
    t1.join();
    t2.join();
}

Extra

Podobně jako jsme implementovali synchronized<T>, které zaručovalo synchronizovaný přístup k nějakému T, můžeme naimplementovat concurrent<T>, které zaručí neblokující vykonávání kódu nad nějakým T. Specificky, kód se bude vykonávat na vlastním vláknu.

Řešení

1)
3. kategorie je v zásadě speciální varianta 2. kategorie
2)
Pokud kód zkompilujete v release módu, tj. s optimalizacemi, může se stát, že překladač usoudí, že counter++; counter–; nemá vlastně žádný význam a tento kód vyhodí.
3)
A std::unique_lock
courses/b6b36pjc/cviceni/cviceni-11.txt · Last modified: 2018/12/19 16:56 by jerabma7