6. Vlákna v C++

Podklady pro cvičení

Vzorové příklady najdete v repozitáři tutorials v adresáři tut06

git pull
cd tut06

Alternativně lze najít kódy také v archivu: ppc-tut06.zip

Template třídy pro hru Logic

Vznik vlákna

Vzniká vytvoření instance třídy std::thread. Běh samotného vlákna bez další funkcionality by nebyl příliš užitečný, proto je argumentem konstruktoru je v minimální variantě ukazatel na funkci, která bude ve vlákně spuštěna.

void funkce () {
    std::cout << "funkce ve vlakne\n";
}
 
int main(){
    std::thread t1 (funkce);
 
    return 0;
}

Program sice vypíše textovou informaci na standardní výstup, ale běh programu skončí chybou.

nove vlakno
terminate called without an active exception

Po vytvoření vlákna v rámci scope (jmenného prostoru) funkce main() je třeba zajistit správné volání destruktoru nového vlákna. Máme následující možnosti:

  • počkat v hlavní vlákně na ukončení nového vlákna zavoláním funkce join()
  • nechat nové vlákno běžet samostatně, voláním funkce detach(). Toto vlákno se spustí a ukončí samo. (Tento přístup může způsobit problémy, zejmén v případě, že vlákno pracuje s proměnnými v novém scope. Lépe nepoužívat.)

std::thread t1(funkce);
t1.join();

Dalšími argumenty kontruktoru vlákna mohou být argumenty předávané funkci.

void funkce2 (int arg) {
    std::cout << "nove vlakno s argumentem arg = " << arg << "\n";
}
 
std::thread t2(funkce2, 100);
t2.join();

Argumentem funkce může být i ukazatel, pak je možné předat stav proměnné.

void funkce3 (int * arg) {
    *arg = 33;
}
 
int x;
std::thread t3(funkce3, &x);

V případě, že je potřeba argument předat jako referenci, lze využít funkci std::ref()

void funkce4 (int & arg)
{
   arg = 33;
}
 
int y;
std::thread t4(funkce4, std::ref(y));

Argumentem konstruktoru může být libovolný funkční objekt.

struct A {
    bool operator()(int arg) {
        std::cout << "[A]: " << arg << std::endl;
    }
}
 
A obj;
std::thread t5 (obj, 10);
t5.join();

Synchronizace

V následujícím příkladu budou spuštěna dvě vlákna, která budou opakovaně vypisovat na standardní výstup posloupnost čísel.

void funkce1() {
    for (int i = 0; i < 100; i++)
        std::cout << "[1] = " << i << std::cout;
}
 
void funkce2() {
    for (int i = -100; i < 0; i++)
        std::cout << "[2] = " << i << std::cout;
}
 
std::thread t1(funkce1);
std::thread t2(funkce2);
t1.join();
t2.join(); 

Po spuštění programu se ukáže, že výpis je značně chaotický. Pokud aplikace obsahuje více vláken, které přistupují ke společnému zdroji (paměť, soubor, standardní I/O, …), je třeba vhodným způsobem zařídit exkluzivní přístup. K tomu slouží zámek, v terminologii vláknového programování nazývané mutex, v C++11 implementovaný jako std::mutex.

Následující kód využivá k přístupu do kritické sekce programu (tj. část kódu, kde probíhá přístup na standardní výstup) globální mutex.

std::mutex m;
 
void funkce1() {
    ...
    m.lock();
    std::cout << "[1] = " << i << std::endl;
    m.unlock();
    ...
} 

V následujícím příkladu přistupují dvě vlákna voláním jednoho lambda výrazu k jedné proměnné. Navíc je demonstrován RAII princip správy zdroje - šablona std::unique_lock vytváří wrapper kolem zámku. Zámek je při konstrukci automaticky uzamče, později může být explicitně uzamčen nebo odemknut, při destrukci objektu/vlákna je zámek automaticky uvolněn.

int counter = 0;
std::mutex m;
auto funkce3 = [&counter, &m]() {
    for (int i=0; i < 10000000; ++i) {
        std::unique_lock<std::mutex> lock(m);
        counter++;
        counter--;
    }
};
 
auto t3 = std::thread(funkce3);
auto t4 = std::thread(funkce3);
t3.join(); 
t4.join();
 
std::cout << counter << std::endl;

V případě potřeby potlačit automatické uzamčení zámku lze použít std::defer_lock jako druhý argument konstruktoru

std::unique_lock<std::mutex> lock(m, std::defer_lock);

Zámky je možné pomocí funkce std::lock() zamykat i hromadně. Funkce je definována jako variadická šablona, takže může mít proměnný počet argumentů.

// 1.
std::mutex m1, m2;
std::lock (m1, m2);
// 2.
std::unique_lock<std::mutex> lk1(m1, std::defer_lock);
std::unique_lock<std::mutex> lk2(m2, std::defer_lock);
std::lock(lk1, lk2);

Dalšími možnostmi automatické správy mutexu jsou std::lock_guard (bez možnosti explicitního uzamčení nebo odemčení zámku), nebo případně std::scoped_lock.

Deadlock

Používání synchronizace vlákem pomocí zámků přináší řadu potenciálních problémů. Jedním z nich je deadlock, stav, ve kterém program nepokračuje, protože na sebe vlákna navzájem čekají. K deadlocku obvykle dojde, pokud se více vláken pokusí zamknout stejné mutexy v různém pořadí.

Představme si následující situaci: nechť třída Krabice popisuje krabičku s kuličkami. Z krabičky je možné kuličky odebírat nebo je naopak přidávat. K dosažení reálnosti situace (a deadlocku :-), zakomponujeme do funkce malé zpoždění, které reprezentuje časovou režii na přenos kuliček. Aby byl zajištěn exkluzivní přístup k atributu (tj. proměnná reprezentující počet kuliček), obsahuje třída vlastní mutex, kterým je možno uzamknout kritickou sekci programu (změna počtu kuliček). Nyní mějme dvě krabičky, mezi kterými budeme vyměňovat určitý počet kuliček, k přesunu slouží funkce transfer s následujícími argumenty: reference na zdrojový objekt (krabičku), cílový objekt (krabičku) a počet kuliček num pro přenos.

#include <iostream>
#include <thread>
#include <mutex>
 
struct Krabice {
    explicit Krabice (int pocet) : kulicky (pocet) {}
    int kulicky;
    std::mutex m;
};
 
void transfer (Krabice &from, Krabice &to, int num) {
 
    std::lock_guard<std::mutex> l1(from.m);
    std::cout << "from -> " << num << "\n";
    from.kulicky -= num;
 
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
    std::lock_guard<std::mutex> l1(to.m);
    std::cout << "to <- " << num << "\n";
    to.kulicky += num;
}
 
int main() {
    Krabice a(10), b(5);
    std::thread t1 (transfer, std::ref(a), std::ref(b), 3);
    std::thread t2 (transfer, std::ref(b), std::ref(a), 3);
 
    t1.join();
    t1.join();
 
    return 0;
}

Program se po spuštění zastaví v okamžiku, kdy se vlákno pokusí zamknout již zamčený mutex. Deadlock se dá vyřešit např. použitím std::lock, odemčení mutexu ve vhodný okamžik lze zařídít uvedením std::adopt_lock jako druhého argumentu std::lock_guard.

// zajistí správné zamčení zámku bez deadlocku
std::lock (from.m, to.m);
 
std::lock_guard<std::mutex> l1(from.m, std::adopt_lock);
std::cout << "from -> " << num << "\n";
from.kulicky -= num;
 
std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
std::lock_guard<std::mutex> l2(to.m, std::adopt_lock);
std::cout << "to <- " << num << "\n";
to.kulicky += num;

Podmínkové proměnné

Podmínkové proměnné (std::condition_variable) slouží ke komunikaci mezi vlákny. Jsou vždy spojeny s nějakým mutexem. Na podmínkové proměnné mohou vlákna čekat na signál od jiných vláken. S čekáním na podmínkové proměnné vždy souvisí predikát, který kontroluje, jestli se vlákno mělo probudit – vlákno se mohlo probudit i samovolně, nebo nemusí být schopno v současném stavu systému pokračovat.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::condition_variable cv;
std::mutex cv_m;
 
int i = 0;
 
void waits() {
    std::unique_lock<std::mutex> lk(cv_m);
    std::cerr << "Waiting... \n";
    cv.wait (lk, [](){return i == 1;});
    std::cerr << "...finished waiting. i == 1\n";
}
 
void signals() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cerr << "Notifying...\n";
    cv.notify_all();
 
    std::this_thread::sleep_for(std::chrono::seconds(1));
    i = 1;
    std::cerr << "Notifying again...\n";
    cv.notify_all();
}
 
int main() {
    std::thread t1 (waits);
    std::thread t2 (signals);
 
    t1.join();
    t2.join();
 
    return 0;
}

Asynchronní volání, future a promise

Asynchronní volání std::async vykoná kód funkce nezávisle na vláknu, ze kterého byla funkce spuštěna. Protože vlákna nemohou vracet návratové hodnoty, výsledek asynchronního volání je k dispozici v proměnných typu std::future. Na rozdíl od vláken, vhodných spíše pro komplexnější výpočty, asynchronní volání jsou vhodná pro jednorázové spuštění funkce, u které není třeba mít okamžitý výsledek (a program může mezitím dělat jiný výpočet).

int funkce();
 
int main() {
    std::future<int> a = std::async(funkce);
    // další výpočty
    // pokud ještě není k dispozici návratová hodnota funkce, program zde počká 
    std::cout << "a = " << a.get() << std::endl;
}

Trochu komplikovanější, ale zato s větší kontrolou nad vlákny, je použití std::promise, které můžotou do std::future zapisovat. V zásadě se jedná o princip producent-konzument - promise je úložiště, do kterého producent vloží hodnotu, kterou si konzument přes future vyzvedne.

void funkce (std::future<int> & fut) {
    int x = fut.get();
    std::cout << "value: " << x << '\n';
}
 
int main (){
    // vytvoření promise
    std::promise<int> prom;                      
    // propojení future a promise 
    std::future<int> fut = prom.get_future();
    // odeslání future do threadu
    std::thread t1 (print_int, std::ref(fut));  
    // inicializace promise 
    prom.set_value (10);                         
 
    th1.join();
 
    return 0;
} 

courses/b2b99ppc/tutorials/06.txt · Last modified: 2024/03/28 00:20 by nentvond