Warning
This page is located in archive.

Použití vláken

Dnes si ukážeme, jak (potenciálně) zrychlit naše programy pomocí vláken. Pokud má náš stroj více procesorů, vlákna nám umožní vykonávat více práce najednou. Na dnešním cvičení začneme s prázdným .cpp souborem.

  • Vytvořte funkci, která 10000-krát vypíše “Ahoj, XXXXX! Jak se mas?\n” na standardní výstup.
  • Vytvořte funkci, která 10000-krát vypíše “Ahoj, OOOOO! Jak se mas?\n” na standardní výstup.
  • Spusťte obě funkce zároveň pomocí objektu std::thread, který v konstruktoru očekává funkci, kterou má provést.
  • Až budou obě funkce spuštěny, počkejte, až se obě dokončí pomocí metody .join().

Pokud je objekt typu std::thread zničen bez toho, aby se na něm zavolalo buďto .join() (čekej na dokončení) nebo .detach() (nečekej na dokončení), std::thread je krajně nespokojený – shodí nám program. Podobně jako jinde v STL je možné vložit do konstruktoru std::thread cokoliv, co připomíná funkci – např. funkční objekt nebo lambdu.

Funkce, která je spuštěna ve vlastním vlákně, může mít parametry. Druhý a další parametr konstruktoru std::thread slouží k tomu, aby tyto parametry předal.

  • Upravte funkce, aby dostávaly parametrem řetězec, který použije jako osloveni místo “XXXXX” a “OOOOO”.
  • Funkce sjednoťte do jedné lambda funkce, abyste omezili míru opakování kódu. Vyzkoušejte si uložit lambda funkci do proměnné – typ lambdy nelze napsat, takže je nutno použít auto.

Řešení

Co se teď stane, když spustíme program? Vidíme, že některé výpisy jsou teď navzájem pomíchané. Důvodem je, že teď každá iterace v každém vlákně použije třikrát výpis na standardní výstup, a tyto tři použití se mohou navzájem libovolně střídat.

  • Vytvořte objekt typu std::mutex, který budou obě vlákna sdílet, ale pouze jedno vlákno může mutex zamknout – druhé vlákno musí počkat.
  • Zpřístupněte objekt typu std::mutex funkci vlákna. Do capture listu lambda funkce přidejte referenci na mutex, nebo pošlete referenci na mutex dalším parametrem.
  • V každé iteraci před výpisem zamkněte mutex pomocí objektu typu std::unique_lock<std::mutex>. V destruktoru std::unique_lock se mutex sám odemkne.

Nyní se nám výpisy přestanou prolínat, protože druhé vlákno nesmí začít vypisovat, dokud první vlákno nedokončí řádek.

Řešení

Uváznutí (deadlock)

Zkusme si pustit následující program:

#include <thread>
#include <mutex>
#include <iostream>
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "Vlakno 1 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "ahoj!\n";
        }
    });
    std::thread t2([&m1, &m2]() {
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            std::cout << "Vlakno 2 rika: ";
            std::cout.flush(); // vynutí výpis
            std::unique_lock<std::mutex> l1(m1);
            std::cout << "ahoj!\n";
        }
    });
 
    t1.join();
    t2.join();
}

Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?

Livelock

Zkusme si pustit následující program:

#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
 
using namespace std::chrono_literals;
 
int main() {
    std::mutex m1;
    std::mutex m2;
 
    std::thread t1([&m1, &m2](){
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l1(m1);
            for (int j = 0; j < 5; ++j) {
                if (m2.try_lock()) {
                    std::cout << "Vlakno 1: " << ++count << '\n';
                    m2.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(2ms);
                }
            }
        }
    });
    std::thread t2([&m1, &m2]() {
        int count = 0;
        for (int i = 0; i < 1000000; ++i) {
            std::unique_lock<std::mutex> l2(m2);
            for (int j = 0; j < 7; ++j) {
                if (m1.try_lock()) {
                    std::cout << "Vlakno 2: " << ++count << '\n';
                    m1.unlock();
                    break;
                }
                else {
                    std::this_thread::sleep_for(3ms);
                }
            }
        }
    });
 
    t1.join();
    t2.join();
}

Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?

Race condition

Zkusme si pustit následující program:

#include <thread>
#include <iostream>
 
int main() {
    int counter = 0;
 
    auto thread_func = [&counter]() {
        for (int i = 0; i < 10000000; ++i) {
            counter++;
            counter--;
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter << '\n';
}

Co se stane po jeho spuštění? Jak k tomu dojde? Jak to vyřešit?

  • Vyřešte situaci pomocí zamykání std::mutex.
  • Vyřešte situaci náhradou typu proměnné counter za std::atomic<int>.
  • Vyřešte situaci vlastní třídou atomic_counter, která má metody .value(), .increment() a .decrement() a která obsahuje int a std::mutex.

Řešení

Vyzkoušejte třídu atomic_counter na této funkci main:

int main() {
    atomic_counter counter;
 
    auto thread_func = [&counter]() {
        while (counter.value() < 5) {
            counter.increment();
        }
    };
 
    std::thread t1(thread_func);
    std::thread t2(thread_func);
    t1.join();
    t2.join();
 
    std::cout << counter.value() << '\n';
}

Co je špatně s touto funkcí main? Co s tím?

  • Přidejte do atomic_counter metodu .increment_if_less_than(int).

Vidíte nějakou vadu v našem řešení problému? Představte si, že chceme, aby vlákna provedla nějakou akci přesně 5-krát. Co s tím?

  • Přidejte metodě increment_if_less_than návratovou hodnotu typu bool, která nám sdělí, zda se inkrementace povedla.

Řešení

Asi tušíte, že pořád naše řešení není příliš obecné. Vždy, když budeme chtít, aby počítadlo umělo něco nového, zjistíme, že budeme potřebovat novou metodu. Co když budeme chtít přičítat víc, než 1? Co když budeme chtít odčítat? Co když budeme chtít něco provést, pouze pokud je hodnota počítadla dělitelná třemi? Možnosti jsou různorodé a každé použití by si vyžádalo novou metodu.

  • Najděte způsob, jak vyhodnotit v atomic_counter libovolnou podmínku a v závislosti na úspěchu provést libovolnou akci.

Řešení

Použití vláken

Podíváme se na použití vláken pro složitější úlohy. Většina vláknových problémů se dá napasovat na dobře známé a řešené problémy. My se podíváme na implementaci producent-consumer problému, kde alespoň dvě vlákna sdílejí frontu s omezenou kapacitou.

Náš producent bude vytvářet čísla - inty a náš spotřebitel je bude z fronty konzumovat. Pokud chceme, můžeme si představit tato čísla jako indexy do jiné, nesynchronizované kolekce (Alternativně můžeme později nasadit šablony).

Základní schéma je jednoduché

class blocking_bounded_queue {
    void add(const ham& h);
    const ham& take(const ham& h);
};
 
blocking_bounded_queue shared_queue;
 
void producer() {
    while (true) {
        auto ham = produce_ham();
        shared_queue.add(ham);
    }
}
 
void consumer() {
    while (true) {
        auto ham = shared_queue.take();
        eat_ham(ham);
    }
}
ale tímto jsme pouze přesunuli složitost dovnitř blocking_bounded_queue, do funkcí add a take. Takže, jak naimplementovat blocking_bounded_queue?

  • Určitě do objektu nesmíme pustit více vláken zároveň.
  • Určitě musíme počítat kapacitu
  • Určitě musíme počítat kolik máme prvků
  • Musíme mít kam prvky ukládat

Dále máme i několik méně očividných požadavků, například chceme, aby volání funkce take tzv. blokovalo (nedojde k návratu z funkce) dokud nelze z fronty něco vybrat a volání funkce add blokovalo dokud nelze do fronty prvek přidat. To vyplývá ze struktury producenta a spotřebitele, kteří předpokládají, že tyto funkce prostě proběhnou. To znamená, že funkce take a add musejí mít na čem čekat (například std::condition_variable).

Zde je kostra blocking_bounded_queue, splňující první 4 podmínky. Jak doimplementujete funkce add a take, aby splňovala i poslední podmínku?

#include <mutex>
#include <queue>
class blocking_bounded_queue {
public:
    void add(int i) {
        std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken
        if (q.size() < capacity){
            q.push(i);
        } else {
            // ?
        }
    }
 
    int take() {
        std::unique_lock<std::mutex> h {m}; // zabrani vstupu vice vlaken
        if (q.size() > 0){
            int temp = q.front(); q.pop();
            return temp;
        } else {
            // ?
        }
    }
 
private:
    std::queue<int> queue;
    std::size_t size_limit;
    std::mutex mutex;
};

Řešení

Monitor

Trik, který jsme použili abychom umožnili provést bezpečně libovolnou akci nad počítadlem, se dá generalizovat pro arbitrární typ. Takové šabloně se občas říká monitor.

Jak se taková šablona implementuje?

Řešení

courses/a7b36pjc/cviceni/cviceni_11.txt · Last modified: 2016/12/21 23:51 by horenmar