c++11新特性實戰 (一):多線程操作


c++11 新特性實戰 (一)

c++11多線程操作

線程

thread

int main()
{
    thread t1(Test1);
    t1.join();
    thread t2(Test2);
    t2.join();
    thread t3 = t1;
    thread t4(t1);
    thread t5 = std::move(t1);
    thread t6(std::move(t1));
    return 0;
}

t3,t4創建失敗,因為thread的拷貝構造和賦值運算符重載的原型是:

thread(const thread&) = delete;
thread& operator=(const thread&) = delete;

被禁用了,但是t5, t6線程是創建成功的。std::move把t1轉換為右值,調用的是函數原型為thread& operator=(thread&& _Other) noexceptthread(thread&& _Other) noexcept

當線程對象t1被移動拷貝和移動賦值給t5和t6的時候,t1就失去了線程控制權,也就是一個線程只能同時被一個線程對象所控制。最直觀的是t1.joinable()返回值為false,joinable()函數后面介紹。

使用類成員函數作為線程參數

class Task
{
public:
    Task(){}
    void Task1() {}
    void Task2() {}
private:
};

int main()
{
    Task task;
    thread t3(&Task::Task1, &task);
    t3.join();
    return 0;
}

關鍵點是要創建一個類對象,並作為第二個參數傳入thread()線程的構造函數中去。

管理當前線程的函數

yield

此函數的准確性為依賴於實現,特別是使用中的 OS 調度器機制和系統狀態。例如,先進先出實時調度器( Linux 的 SCHED_FIFO )將懸掛當前線程並將它放到准備運行的同優先級線程的隊列尾(而若無其他線程在同優先級,則 yield 無效果)。

#include <iostream>
#include <chrono>
#include <thread>
 
// 建議其他線程運行一小段時間的“忙睡眠”
void little_sleep(std::chrono::microseconds us)
{
    auto start = std::chrono::high_resolution_clock::now();
    auto end = start + us;
    do {
        std::this_thread::yield();
    } while (std::chrono::high_resolution_clock::now() < end);
}
 
int main()
{
    auto start = std::chrono::high_resolution_clock::now();
 
    little_sleep(std::chrono::microseconds(100));
 
    auto elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << "waited for "
              << std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count()
              << " microseconds\n";
}

get_id

這個函數不用過多介紹了,就是用來獲取當前線程id的,用來標識線程的身份。

 std::thread::id this_id = std::this_thread::get_id();

sleep_for

位於this_thread命名空間下,msvc下支持兩種時間參數。

std::this_thread::sleep_for(2s);
std::this_thread::sleep_for(std::chrono::seconds(1));

sleep_untile

參數構建起來挺麻煩的,一般場景下要求線程睡眠的就用sleep_for就行了

using std::chrono::system_clock;
time_t tt = system_clock::to_time_t(system_clock::now());
struct std::tm *ptm = localtime(&tt);
 std::this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));

互斥

mutex

對於互斥量看到一個很好的比喻:

單位上有一台打印機(共享數據a),你要用打印機(線程1要操作數據a),同事老王也要用打印機(線程2也要操作數據a),但是打印機同一時間只能給一個人用,此時,規定不管是誰,在用打印機之前都要向領導申請許可證(lock),用完后再向領導歸還許可證(unlock),許可證總共只有一個,沒有許可證的人就等着在用打印機的同事用完后才能申請許可證(阻塞,線程1lock互斥量后其他線程就無法lock,只能等線程1unlock后,其他線程才能lock),那么,這個許可證就是互斥量。互斥量保證了使用打印機這一過程不被打斷。

代碼示例:

mutex mtx;

int gNum = 0;
void Test1()
{
    mtx.lock();
    for(int n = 0; n < 5; ++n)
        gNum++;
    mtx.unlock();
}

void Test2()
{
    std::cout << "gNum = " << gNum << std::endl;
}

int main()
{
    thread t1(Test1);
    t1.join();
    thread t2(Test2);
    t2.join();
    return 0;
}

join()表示主線程等待子線程結束再繼續執行,如果我們的期望是打印循環自增之后的gNum的值,那t1.join()就放在t2創建之前調用。因為t2的創建就標志着t2線程創建好然后開始執行了

通常mutex不單獨使用,因為lock和unlock必須配套使用,如果忘記unlock很可能造成死鎖,即使unlock寫了,但是如果在執行之前程序捕獲到異常,也還是一樣會死鎖。如何解決使用mutex造成的死鎖問題呢?下面介紹unique_gard和lock_guard的時候詳細說明。

timed_mutex

提供互斥設施,實現有時限鎖定

std::mutex cout_mutex; // 控制到 std::cout 的訪問
std::timed_mutex mutex;

void job(int id)
{
    using Ms = std::chrono::milliseconds;
    std::ostringstream stream;

    for (int i = 0; i < 3; ++i) {
        if (mutex.try_lock_for(Ms(100))) {
            stream << "success ";
            std::this_thread::sleep_for(Ms(100));
            mutex.unlock();
        } else {
            stream << "failed ";
        }
        std::this_thread::sleep_for(Ms(100));
    }

    std::lock_guard<std::mutex> lock(cout_mutex);
    std::cout << "[" << id << "] " << stream.str() << "\n";
}

int main()
{
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(job, i);
    }

    for (auto& i: threads) {
        i.join();
    }
}

這里的第28行衍生出一個知識點:STL的emplace_back函數。這是c++11新增的容器類的操作函數,如果第二個參數忽略,用法和push_back相似,都是在stl后面追加元素。函數原型:

template<class... _Valty>
decltype(auto) emplace_back(_Valty&&... _Val);

是一個變長的模板函數,例子中的代碼傳遞的是一個函數指針jobemplace_back的實現會把job傳遞給std::thread的構造函數,與push_back需要是std::thread類型作為參數不同,所以emplace_back是直接在容器中構造了要添加的元素,省去了再次把參數拷貝到stl中的過程,效率更高。目前來看還沒有什么副作用,所以推薦以后在使用stl的時候使用emplace_back取代push_back.

使用timed_mutex的時候也無法用unique_lock這樣的RAII機制來控制加解鎖操作,所以不同的互斥量的使用場景要區分清楚。在對的時候使用對的東西也是碼農進階的一個標志~

try_lock_until的例子:

void f()
{
    auto now=std::chrono::steady_clock::now();
    test_mutex.try_lock_until(now + std::chrono::seconds(10));
    std::cout << "hello world\n";
}

int main()
{
    std::lock_guard<std::timed_mutex> l(test_mutex);
    std::thread t(f);
    t.join();
}

recursive_mutex

提供能被同一線程遞歸鎖定的互斥設施

recursive_mutex 類是同步原語,能用於保護共享數據免受從個多線程同時訪問。

recursive_mutex 提供排他性遞歸所有權語義:

  1. 調用方線程在從它成功調用 locktry_lock 開始的時期里占有 recursive_mutex 。此時期間,線程可以進行對 locktry_lock 的附加調用。所有權的時期在線程調用 unlock 匹配次數時結束。

  2. 線程占有 recursive_mutex 時,若其他所有線程試圖要求 recursive_mutex 的所有權,則它們將阻塞(對於調用 lock )或收到 false 返回值(對於調用 try_lock )。

  3. 可鎖定 recursive_mutex 次數的最大值是未指定的,但抵達該數后,對 lock 的調用將拋出 std::system_error 而對 try_lock 的調用將返回 false 。

recursive_mutex 在仍為某線程占有時被銷毀,則程序行為未定義。 recursive_mutex 類滿足互斥體 (Mutex) 標准布局類型**(StandardLayoutType) 的所有要求。

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
 
class X {
    std::recursive_mutex m;
    std::string shared;
  public:
    void fun1() {
      std::lock_guard<std::recursive_mutex> lk(m);
      shared = "fun1";
      std::cout << "in fun1, shared variable is now " << shared << '\n';
    }
    void fun2() {
      std::lock_guard<std::recursive_mutex> lk(m);
      shared = "fun2";
      std::cout << "in fun2, shared variable is now " << shared << '\n';
      fun1(); // 遞歸鎖在此處變得有用
      std::cout << "back in fun2, shared variable is " << shared << '\n';
    };
};
 
int main() 
{
    X x;
    std::thread t1(&X::fun1, &x);
    std::thread t2(&X::fun2, &x);
    t1.join();
    t2.join();
}

這里的 std::recursive_mutex m;如果修改成std::mutex m;程序運行的時候會直接崩潰,原因是兩個std::lock_guard使用的是同一個std::mutex,造成func1()加鎖的時候死鎖 。

std::recursive_mutexstd::mutex最顯著的區別是,前者對同一個互斥量重復加鎖,釋放的時候是遞歸釋放,所以在此場景中不會產生std::mutex一樣的死鎖問題。

recursive_timed_mutex

提供能被同一線程遞歸鎖定的互斥設施,並實現有時限鎖定

使用方法和std::recursive_mutex類似,不做詳細介紹~

shared_mutex(C++17)

shared_timed_mutex(C++14)

這兩個屬於更高標准的std包含的內容,感興趣的讀者自行了解~

通用互斥管理

lock_guard

void Test1()
{
    std::lock_guard<std::mutex> lg(mtx);
    for(int n = 0; n < 5; ++n)
    {
        gNum++;
        std::cout << "gNum = " << gNum << std::endl;
    }
}
int main()
{
    thread t1(Test1);
    thread t2(Test1);
    t1.join();
    t2.join();
    return 0;
}

lock_guard相當於利用RAII機制(“資源獲取就是初始化”)把mutex封裝了一下,在構造中lock,在析構中unlock。避免了中間過程出現異常導致的mutex不能夠正常unlock.自己在寫代碼的過程中也可以通過RAII機制封裝一個簡單的堆內存管理的類:

template<typename Type>
class WRaii
{
public:
    WRaii(const Type& value){
        m_Type = new Type(value);
    }
    ~WRaii(){
        if(m_Type)
            delete m_Type;
    }
    Type* operator ->() const
    {
        return m_Type;
    }
private:
    Type*   m_Type  = nullptr;
};

class Test
{
public:
    Test(int num) { m_Num = num; }
    void Do() { std::cout << __FUNCTION__ << std::endl;}
    int Num() { return m_Num; }
private:
    int m_Num = 5;
};

int main()
{
    WRaii<int> ra(5);//調用int的構造
    WRaii<Test> ra1(6);//調用Test的構造
    int res = ra1->Num();
}

其實這個例子不太合適,這樣寫好之后就相當於一個棧內存的變量了,不如直接int a(5)這樣寫,哈哈,其實想要實現的是如果有需要成對呈現的操作,且有邏輯上的先后關系的話就可以把成對的操作放到上面模板類的構造和析構里面,這樣就能夠確保成對的操作成對出現了

scoped_lock(c++17)

unique_lock

unique_lock和lock_guard不同的是:unique_lock的成員函數lockunlock允許使用者更靈活的加鎖和解鎖。

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::unique_lock

std::mutex mtx;           // mutex for critical section

void print_block (int n, char c) {
  // critical section (exclusive access to std::cout signaled by lifetime of lck):
  std::unique_lock<std::mutex> lck (mtx, std::defer_lock);
  bool res = lck.try_lock();
  res = mtx.try_lock();
  for (int i=0; i<n; ++i) { std::cout << c; }
  std::cout << '\n';
}

int main ()
{
  std::thread th1 (print_block,50,'*');
  std::thread th2 (print_block,50,'$');

  th1.join();
  th2.join();

  return 0;
}

c++11有三種加鎖策略:

策略 tag type 描述
(默認) 請求鎖,阻塞當前線程直到成功獲得鎖。
std::defer_lock std::defer_lock_t 不請求鎖。
std::try_to_lock std::try_to_lock_t 嘗試請求鎖,但不阻塞線程,鎖不可用時也會立即返回。
std::adopt_lock std::adopt_lock_t 假定當前線程已經獲得互斥對象的所有權,所以不再請求鎖。

上面的例子中用到了std::defer_lock策略,第十行res為true,是一行res為false,不能對同一mutex重復加鎖。這里如果加鎖策略是默認值,運行到第十行的時候程序會直接崩潰,原因我還不太清楚~,總之try_lock和默認的鎖策略是沖突的,不能一起使用。

std::mutex mt;
std::unique_lock<std::mutex> lck(mt, std::defer_lock);
assert(lck.owns_lock() == false);
lck.lock();
assert(lck.owns_lock() == true);

演示第一種加鎖策略,其他策略請讀者自行嘗試~

defer_lock_t

try_to_lock_t

adopt_lock_t

defer_lock

try_to_lock

adopt_lock

上面6個代表了三種加鎖策略的類型。

通用鎖算法

try_lock

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::try_lock
#include <chrono>

std::mutex foo,bar;

void task_a () {
    foo.lock();
    std::cout << "task a\n";
    bar.lock();
    // ...
//    std::this_thread::sleep_for(std::chrono::microseconds(1000)); //1
    foo.unlock();
//    std::this_thread::sleep_for(std::chrono::microseconds(1000)); //2
    bar.unlock();
}

void task_b () {
    int x = try_lock(foo, bar);
    std::cout << "x = " << x << std::endl;
    if (x==-1) {
        std::cout << "task b\n";
        // ...
        bar.unlock();
        foo.unlock();
    }
    else {
        std::cout << "[task b failed: mutex " << (x?"bar":"foo") << " locked]\n";
    }
}

int main ()
{
    std::thread th1 (task_a);
    std::thread th2 (task_b);
    th1.join();
    th2.join();
    return 0;
}

std::try_lock全局函數

嘗試使用其try_lock成員函數鎖定所有作為參數傳遞的對象(非阻塞)。

該函數為每個參數(第一個a,然后b,最后是cde中的其他參數,以相同的順序)調用try_lock成員函數,直到所有調用成功,或者其中一個調用失敗(通過返回 錯誤或引發異常)。

如果函數由於調用失敗而結束,則對try_lock調用成功的所有對象都將解鎖,並且該函數將返回鎖定失敗的對象的參數順序號。 對參數列表中的其余對象不執行進一步的調用。

13和15行都注釋掉的話, x = -1 表示foo和bar都加鎖成功

13去掉注釋, x = 0, 表示參數列表里第一個加鎖失敗的是foo

15行去掉注釋, x = 1,表示參數列表里第一個加鎖失敗的是bar

lock

// std::lock example
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock

std::mutex foo,bar;

void task_a () {
  // foo.lock(); bar.lock(); // replaced by:
  std::lock (foo,bar);
  std::cout << "task a\n";
  foo.unlock();
  bar.unlock();
}

void task_b () {
  // bar.lock(); foo.lock(); // replaced by:
  std::lock (bar,foo);
  std::cout << "task b\n";
  bar.unlock();
  foo.unlock();
}

int main ()
{
  std::thread th1 (task_a);
  std::thread th2 (task_b);

  th1.join();
  th2.join();

  return 0;
}

鎖定作為參數傳遞的所有對象,並在必要時阻止調用線程。

該函數使用對對象的成員lock,try_lock和unlock的未指定調用序列來鎖定對象,以確保所有參數在返回時都被鎖定(不產生任何死鎖)。

如果該函數無法鎖定所有對象(例如,由於內部調用之一引發了異常),則該函數會在失敗之前首先解鎖成功鎖定的所有對象(如果有)。

注:有點像數據庫事務的邏輯,要成功都成功,有一個失敗就rollback。

單次調用

once_flag

call_once

#include <iostream>       // std::cout
#include <thread>         // std::thread, std::this_thread::sleep_for
#include <chrono>         // std::chrono::milliseconds
#include <mutex>          // std::call_once, std::once_flag

int winner;
void set_winner (int x) { winner = x; }
std::once_flag winner_flag;

void wait_1000ms (int id) {
  // count to 1000, waiting 1ms between increments:
  for (int i=0; i<1000; ++i)
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  // claim to be the winner (only the first such call is executed):
  std::call_once (winner_flag,set_winner,id);
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(wait_1000ms,i+1);

  std::cout << "waiting for the first among 10 threads to count 1000 ms...\n";

  for (auto& th : threads) th.join();
  std::cout << "winner thread: " << winner << '\n';

  return 0;
}

調用傳遞參數args的fn,除非另一個線程已經執行(或正在執行)具有相同標志的對call_once的調用。

如果另一個線程已經在主動執行帶有相同標志的對call_once的調用,則將導致被動執行:被動執行不調用fn但不會返回,直到主動執行本身返回並且此時所有可見副作用都已同步在所有使用相同標志的並發調用中。

如果對call_once的主動調用通過拋出異常(傳播到其調用線程)而結束,並且存在被動執行,則在這些被動執行中選擇一個,並稱為新的主動調用。

請注意,一旦返回了主動執行,所有當前的被動執行和將來對call_once的調用(具有相同標志)也將返回而不會變為主動執行。

活動執行使用fn和args的左值或右值引用的衰變副本,而忽略fn返回的值

條件變量

condition_variable

// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  //(id != 1)
     //std::this_thread::sleep_for(MS(10));
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);//RAII機制阻塞線程,ready的判斷是為了防止如果有的線程還沒有跑到這一步主線程就調用了go()函數,則會造成部分線程未執行wait,也就不會接收到notify_all()通知
  // ...
  std::cout << "thread " << id << '\n';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

代碼分析:代碼實現的是創建10個線程,然后讓線程都處於等待的狀態,然后通過對同一個mutex操作的notify_all函數同時喚醒10個線程,打印順序就是10個線程的名次。

這里需要簡單說一下std::condition_variable::wait()函數,在wait()的時候會釋放鎖,在被notify_one或者notify_all喚醒之后重新上鎖。wait()的一個重載形式是:

template<class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred);

_Pred是一個返回bool的表達式,當返回值為true,wait()接收到notify信號就解除阻塞;當返回值為false,wait()接收到notify信號依然阻塞。

所以通過我們對wait()的了解就可以清楚的分析出程序的運行流程,目前程序運行的結果是:

1 2 5 8 9 0 3 4 6 7

如果在15行的循環里加上打印語句會發現所有的子線程都wait()成功了,然后釋放了鎖,go()函數獲取到鎖就可以走接下來的流程,通知所有的線程競爭執行。

如果加上12、13行,先執行go()了 ready = true之后就只有id=1的線程會阻塞了,所以例子中的寫法還是稍微有點問題的,感興趣的朋友可以自己去拓展~

    #include <iostream>
    #include <string>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    
    std::mutex m;
    std::condition_variable cv;
    std::string data;
    bool ready = false;
    bool processed = false;
    
    void worker_thread()
    {
        // 等待直至 main() 發送數據
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return ready;});
    
        // 等待后,我們占有鎖。
        std::cout << "Worker thread is processing data\n";
        data += " after processing";
    
        // 發送數據回 main()
        processed = true;
        std::cout << "Worker thread signals data processing completed\n";
    
        // 通知前完成手動解鎖,以避免等待線程才被喚醒就阻塞(細節見 notify_one )
        lk.unlock();
        cv.notify_one();
    }
    
    int main()
    {
        std::thread worker(worker_thread);
    
        data = "Example data";
        // 發送數據到 worker 線程
        {
            std::lock_guard<std::mutex> lk(m);
            ready = true;
            std::cout << "main() signals data ready for processing\n";
        }
        cv.notify_one();
    
        // 等候 worker
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, []{return processed;});
        }
        std::cout << "Back in main(), data = " << data << '\n';
    
        worker.join();
    }
    

執行結果:

main() signals data ready for processing
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), data = Example data after processing

condition_variable 類是同步原語,能用於阻塞一個線程,或同時阻塞多個線程,直至另一線程修改共享變量(條件)並通知 condition_variable

有意修改變量的線程必須

  1. 獲得 std::mutex (常通過 std::lock_guard

  2. 在保有鎖時進行修改

  3. std::condition_variable 上執行 notify_onenotify_all (不需要為通知保有鎖)

    即使共享變量是原子的,也必須在互斥下修改它,以正確地發布修改到等待的線程。

    任何有意在 std::condition_variable 上等待的線程必須

    1. 在與用於保護共享變量者相同的互斥上獲得 std::unique_lock<std::mutex>
    2. 執行下列之一:
      1. 檢查條件,是否為已更新或提醒它的情況
      2. 執行 waitwait_forwait_until ,等待操作自動釋放互斥,並懸掛線程的執行。
      3. condition_variable 被通知時,時限消失或虛假喚醒發生,線程被喚醒,且自動重獲得互斥。之后線程應檢查條件,若喚醒是虛假的,則繼續等待。

    放一下wait_for的例子:

    // condition_variable::wait_for example
    #include <iostream>           // std::cout
    #include <thread>             // std::thread
    #include <chrono>             // std::chrono::seconds
    #include <mutex>              // std::mutex, std::unique_lock
    #include <condition_variable> // std::condition_variable, std::cv_status
    
    std::condition_variable cv;
    
    int value;
    
    void read_value() {
      std::cin >> value;
      cv.notify_one();
    }
    
    int main ()
    {
      std::cout << "Please, enter an integer (I'll be printing dots): \n";
      std::thread th (read_value);
    
      std::mutex mtx;
      std::unique_lock<std::mutex> lck(mtx);
      while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) {
        std::cout << '.' << std::endl;
      }
      std::cout << "You entered: " << value << '\n';
    
      th.join();
    
      return 0;
    }
    

condition_variable_any

與condition_variable相同,不同之處在於其等待函數可以將任何可鎖定類型用作參數(condition_variable對象只能采用unique_lock )。 除此之外,它們是相同的。

// condition_variable_any::wait (with predicate)
#include <iostream>           // std::cout
#include <thread>             // std::thread, std::this_thread::yield
#include <mutex>              // std::mutex
#include <condition_variable> // std::condition_variable_any

std::mutex mtx;
std::condition_variable_any cv;

int cargo = 0;
bool shipment_available() {return cargo!=0;}

void consume (int n) {
  for (int i=0; i<n; ++i) {
    mtx.lock();
    cv.wait(mtx,shipment_available);
    // consume:
    std::cout << cargo << '\n';
    cargo=0;
    mtx.unlock();
  }
}

int main ()
{
  std::thread consumer_thread (consume,10);

  // produce 10 items when needed:
  for (int i=0; i<10; ++i) {
    while (shipment_available()) std::this_thread::yield();
    mtx.lock();
    cargo = i+1;
    cv.notify_one();
    mtx.unlock();
  }

  consumer_thread.join();

  return 0;
}

notify_all_at_thread_exit

// notify_all_at_thread_exit
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << '\n';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  std::notify_all_at_thread_exit(cv,std::move(lck));
  ready = true;
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);
  std::cout << "10 threads ready to race...\n";

  std::thread(go).detach();   // go!

  for (auto& th : threads) th.join();

  return 0;
}

跟之前的例子差不多,看一下應該就能理解~

cv_status

wait_for()的返回值類型。

Future

promise

// promise example
#include <iostream>       // std::cout
#include <functional>     // std::ref
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

void print_int (std::future<int>& fut) {
  int x = fut.get();
  std::cout << "value: " << x << '\n';
}

int main ()
{
  std::promise<int> prom;                      // create promise

  std::future<int> fut = prom.get_future();    // engagement with future

  std::thread th1 (print_int, std::ref(fut));  // send future to new thread

  prom.set_value (10);                         // fulfill promise
                                               // (synchronizes with getting the future)
  th1.join();
  return 0;
}

Promise是一個對象,它可以存儲要由Future對象(可能在另一個線程中)檢索的T類型的值,並提供一個同步點。

在構造上,promise對象與新的共享狀態相關聯,在它們上可以存儲T類型的值或從std :: exception派生的異常。

可以通過調用成員get_future將該共享狀態與Future的對象相關聯。 調用之后,兩個對象共享相同的共享狀態:
-Promise對象是異步提供程序,應在某個時候為共享狀態設置一個值。
-Future對象是一個異步返回對象,可以檢索共享狀態的值,並在必要時等待其准備就緒。

共享狀態的生存期至少要持續到與之關聯的最后一個對象釋放它或銷毀它為止。 因此,如果它也與Future相關聯,則它可以幸免最初獲得它的Promise對象。

std::future::get()是阻塞的,直到另外一個線程調用std::promise::set_value()。

packaged_task

// packaged_task example
#include <iostream>     // std::cout
#include <future>       // std::packaged_task, std::future
#include <chrono>       // std::chrono::seconds
#include <thread>       // std::thread, std::this_thread::sleep_for

// count down taking a second for each value:
int countdown (int from, int to) {
  for (int i=from; i!=to; --i) {
    std::cout << i << '\n';
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
  std::cout << "Lift off!\n";
  return from-to;
}

int main ()
{
  std::packaged_task<int(int,int)> tsk (countdown);   // set up packaged_task
  std::future<int> ret = tsk.get_future();            // get future

  std::thread th (std::move(tsk),10,0);   // spawn thread to count down from 10 to 0

  // ...

  int value = ret.get();                  // wait for the task to finish and get result

  std::cout << "The countdown lasted for " << value << " seconds.\n";

  th.join();

  return 0;
}

std::packaged_task用法類似std::function,但是返回結果可以通過關聯的std::future獲取。

future

// future example
#include <iostream>       // std::cout
#include <future>         // std::async, std::future
#include <chrono>         // std::chrono::milliseconds

// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
  for (int i=2; i<x; ++i) if (x%i==0) return false;
  return true;
}

int main ()
{
  // call function asynchronously:
  std::future<bool> fut = std::async (is_prime,444444443); 

  // do something while waiting for function to set future:
  std::cout << "checking, please wait";
  std::chrono::milliseconds span (100);
  while (fut.wait_for(span)==std::future_status::timeout)
    std::cout << '.' << std::flush;

  bool x = fut.get();     // retrieve return value

  std::cout << "\n444444443 " << (x?"is":"is not") << " prime.\n";

  return 0;
}

shared_future

async

// async example
#include <iostream>       // std::cout
#include <future>         // std::async, std::future

// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
  std::cout << "Calculating. Please, wait...\n";
  for (int i=2; i<x; ++i) if (x%i==0) return false;
  return true;
}

int main ()
{
  // call is_prime(313222313) asynchronously:
  std::future<bool> fut = std::async (is_prime,313222313);

  std::cout << "Checking whether 313222313 is prime.\n";
  // ...

  bool ret = fut.get();      // waits for is_prime to return

  if (ret) std::cout << "It is prime!\n";
  else std::cout << "It is not prime.\n";

  return 0;
}

launch

future_status

Future錯誤

future_error
future_category
future_errc

Future目前本身用的比較少,目前只是列出來比較常見的一些類和函數的用法,之后再遇到復雜的使用場景的時候再把相關內容補上~

線程池代碼:

  • threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <deque>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

class Task
{
public:
    Task(int id);
    void Do();
private:
    int m_ID;
};

class MyThread : public std::thread
{
public:
    MyThread();
private:
    bool isFree;
};

class ThreadPool
{
public:
    ThreadPool();
~ThreadPool();
    void Start(int tNum);
    void AppendTask(const Task* task);
    void Stop();
    void Join();

private:
    void work();
private:
    std::deque<Task*> m_Tasks;
    std::vector<std::thread*> m_Threads;
    std::mutex  m_Mutex;
    std::condition_variable m_Cond;

    bool m_IsRunning;
};

#endif // THREADPOOL_H

  • threadpool.cpp
#include <functional>
#include <iostream>
#include <chrono>
#include "threadpool.h"

using DoTask = std::function<void()>();
using MS = std::chrono::seconds;

ThreadPool::ThreadPool()
{

}

ThreadPool::~ThreadPool()
{
    if (m_IsRunning)
        Stop();
}

void ThreadPool::Start(int tNum)
{
    std::unique_lock<std::mutex> ul(m_Mutex);
    m_IsRunning = true;
    for(int n = 0; n < tNum; ++n)
        m_Threads.emplace_back(new std::thread(&ThreadPool::work, this));

    std::cout << __FUNCTION__ << std::endl;
}

void ThreadPool::AppendTask(const Task *task)
{
    m_Tasks.push_back(const_cast<Task*>(task));
    m_Cond.notify_one();
}

void ThreadPool::Stop()
{
    {
        std::unique_lock<std::mutex> mt(m_Mutex);
        m_IsRunning = false;
        m_Cond.notify_all();
    }

    for(std::thread* t : m_Threads)
    {
        if(t->joinable())
            t->join();
    }
}

void ThreadPool::Join()
{

}

void ThreadPool::work()
{
    while(m_IsRunning)
    {
        Task* task = nullptr;
        {
            std::unique_lock<std::mutex> ul(m_Mutex);
            if(!m_Tasks.empty())
            {
                task = m_Tasks.back();
                m_Tasks.pop_back();
            }
            else
            {
                std::cout << "wait id = " << std::this_thread::get_id() << std::endl;
                m_Cond.wait(ul);
            }
        }
        if(task)
            task->Do();
    }
}

Task::Task(int id)
    : m_ID(id)
{

}

void Task::Do()
{
    std::mutex mu;
    std::unique_lock<std::mutex> ul(mu);
    std::this_thread::sleep_for(MS(m_ID));
    std::cout << "thread id: " << std::this_thread::get_id() << " do task: " << m_ID << std::endl;
}
  • main.cpp
#include <iostream>
#include "threadpool.h"
using namespace std;

int main()
{
    ThreadPool tp;
    tp.Start(10);

    int n = 0;
    Task task1(++n);
    Task task2(++n);
    Task task3(++n);
    Task task4(++n);

    while(1)
    {
        char c;
        std::cout << "please input char c to input tasks:" << std::endl;
        std::cin >> c;
        if(c == 'c')
        {
            tp.AppendTask(&task1);
            tp.AppendTask(&task2);
            tp.AppendTask(&task3);
            tp.AppendTask(&task4);
        }
        else
        {
            continue;
        }
    }

    tp.Join();
    return 0;
}

關鍵點:

  1. 剛開始根據參數創建固定數量的線程
  2. 線程函數是成員函數
  3. 每個線程是一個跑在一個死循環里面
    1. 等待 ->執行任務->等待
    2. 通過一個isRunning標志位決定該線程是否退出,在stop函數里設置isRunning = false
  4. 標准的生產者-消費者模型


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM