解讀github上流行的ThreadPool源碼


前言

偶然發現github上有個ThreadPool項目(https://github.com/progschj/ThreadPool ),star數居然3k+,里面也就兩個文件,一個ThreadPool.h,一個example.cpp

看了一下,項目代碼是cpp11寫的。老實說,代碼極其簡潔又難懂。

下面是ThreadPool.h可以看看,有個直觀印象。

#ifndef THREAD_POOL_H
  #define THREAD_POOL_H
  ​
  #include <vector>
  #include <queue>
  #include <memory>
  #include <thread>
  #include <mutex>
  #include <condition_variable>
  #include <future>
  #include <functional>
  #include <stdexcept>class ThreadPool {
  public:
      ThreadPool(size_t);
      template<class F, class... Args>
      auto enqueue(F&& f, Args&&... args) 
          -> std::future<typename std::result_of<F(Args...)>::type>;
      ~ThreadPool();
  private:
      // need to keep track of threads so we can join them
      std::vector< std::thread > workers;
      // the task queue
      std::queue< std::function<void()> > tasks;
      
      // synchronization
      std::mutex queue_mutex;
      std::condition_variable condition;
      bool stop;
  };
   
  // the constructor just launches some amount of workers
  inline ThreadPool::ThreadPool(size_t threads)
      :   stop(false)
  {
      for(size_t i = 0;i<threads;++i)
          workers.emplace_back(
              [this]
              {
                  for(;;)
                  {
                      std::function<void()> task;
  ​
                      {
                          std::unique_lock<std::mutex> lock(this->queue_mutex);
                          this->condition.wait(lock,
                              [this]{ return this->stop || !this->tasks.empty(); });
                          if(this->stop && this->tasks.empty())
                              return;
                          task = std::move(this->tasks.front());
                          this->tasks.pop();
                      }
  ​
                      task();
                  }
              }
          );
  }
  ​
  // add new work item to the pool
  template<class F, class... Args>
  auto ThreadPool::enqueue(F&& f, Args&&... args) 
      -> std::future<typename std::result_of<F(Args...)>::type>
  {
      using return_type = typename std::result_of<F(Args...)>::type;
  ​
      auto task = std::make_shared< std::packaged_task<return_type()> >(
              std::bind(std::forward<F>(f), std::forward<Args>(args)...)
          );
          
      std::future<return_type> res = task->get_future();
      {
          std::unique_lock<std::mutex> lock(queue_mutex);
  ​
          // don't allow enqueueing after stopping the pool
          if(stop)
              throw std::runtime_error("enqueue on stopped ThreadPool");
  ​
          tasks.emplace([task](){ (*task)(); });
      }
      condition.notify_one();
      return res;
  }
  ​
  // the destructor joins all threads
  inline ThreadPool::~ThreadPool()
  {
      {
          std::unique_lock<std::mutex> lock(queue_mutex);
          stop = true;
      }
      condition.notify_all();
      for(std::thread &worker: workers)
          worker.join();
  }
  ​
  #endif

 

example.cpp是對線程池ThreadPool.h的調用。

#include <iostream>
  #include <vector>
  #include <chrono>
  ​
  #include "ThreadPool.h"int main()
  {
      
      ThreadPool pool(4);
      std::vector< std::future<int> > results;
  ​
      for(int i = 0; i < 8; ++i) {
          results.emplace_back(
              pool.enqueue([i] {
                  std::cout << "hello " << i << std::endl;
                  std::this_thread::sleep_for(std::chrono::seconds(1));
                  std::cout << "world " << i << std::endl;
                  return i*i;
              })
          );
      }
  ​
      for(auto && result: results)
          std::cout << result.get() << ' ';
      std::cout << std::endl;
      
      return 0;
  }

 

看了以上代碼應該是要勸退不少人了,反正我的第一感覺是這樣的。

ThreadPool分析

ThreadPool類中有:

5個成員變量

  • std::vector< std::thread > workers 用於存放線程的數組,用vector容器保存

  • std::queue< std::function<void()> > tasks 用於存放任務的隊列,用queue隊列進行保存。任務類型為std::function<void()>。因為 std::function是通用多態函數封裝器,也就是說本質上任務隊列中存放的是一個個函數

  • std::mutex queue_mutex 一個訪問任務隊列的互斥鎖,在插入任務或者線程取出任務都需要借助互斥鎖進行安全訪問

  • std::condition_variable condition 一個用於通知線程任務隊列狀態的條件變量,若有任務則通知線程可以執行,否則進入wait狀態

  • bool stop 標識線程池的狀態,用於構造與析構中對線程池狀態的了解

3個成員函數

  • ThreadPool(size_t) 線程池的構造函數

  • auto enqueue(F&& f, Args&&... args) 將任務添加到線程池的任務隊列中

  • ~ThreadPool() 線程池的析構函數

  class ThreadPool {
  public:
      ThreadPool(size_t);
      template<class F, class... Args>
      auto enqueue(F&& f, Args&&... args) 
          -> std::future<typename std::result_of<F(Args...)>::type>;
      ~ThreadPool();
  private:
      // need to keep track of threads so we can join them
      std::vector< std::thread > workers;
      // the task queue
      std::queue< std::function<void()> > tasks;
      
      // synchronization
      std::mutex queue_mutex;
      std::condition_variable condition;
      bool stop;
  };

 

構造函數解析

  inline ThreadPool::ThreadPool(size_t threads)
      :   stop(false)
  {
      for(size_t i = 0;i<threads;++i)
          workers.emplace_back(
              [this]
              {
                  for(;;)
                  {
                      std::function<void()> task;
  ​
                      {
                          std::unique_lock<std::mutex> lock(this->queue_mutex);
                          this->condition.wait(lock,
                              [this]{ return this->stop || !this->tasks.empty(); });
                          if(this->stop && this->tasks.empty())
                              return;
                          task = std::move(this->tasks.front());
                          this->tasks.pop();
                      }
  ​
                      task();
                  }
              }
          );
  }

 

構造函數定義為inline。

接收參數threads表示線程池中要創建多少個線程。

初始化成員變量stopfalse,即表示線程池啟動着。

然后進入for循環,依次創建threads個線程,並放入線程數組workers中。

在vector中,emplace_back()成員函數的作用是在容器尾部插入一個對象,作用效果與push_back()一樣,但是兩者有略微差異,即emplace_back(args)中放入的對象的參數,而push_back(OBJ(args))中放入的是對象。即emplace_back()直接在容器中以傳入的參數直接調用對象的構造函數構造新的對象,而push_back()中先調用對象的構造函數構造一個臨時對象,再將臨時對象拷貝到容器內存中。

我們知道,在C++11中,創建線程的方式為:

   std::thread t(fun);    //fun為線程的執行函數

 

所以,上述workers.emplace_back()中,我們傳入的lambda表達式就是創建線程的fun()函數。

下面來分析下該lambda表達式:

  [this]{
      for(;;)
      {
          std::function<void()> task;
  ​
          {
              std::unique_lock<std::mutex> lock(this->queue_mutex);
              this->condition.wait(lock,
                                   [this]{ return this->stop || !this->tasks.empty(); });
              if(this->stop && this->tasks.empty())
                  return;
              task = std::move(this->tasks.front());
              this->tasks.pop();
          }
  ​
          task();
      }
  }

 

lambda表達式的格式為:

[ 捕獲 ] ( 形參 ) 說明符(可選) 異常說明 attr -> 返回類型 { 函數體 }

所以上述lambda表達式為 [ 捕獲 ] { 函數體 } 類型。

該lambda表達式捕獲線程池指針this用於在函數體中使用(調用線程池成員變量stop、tasks等)

分析函數體,for(;;)為一個死循環,表示每個線程都會反復這樣執行,這其實每個線程池中的線程都會這樣。

在循環中,,先創建一個封裝void()函數的std::function對象task,用於接收后續從任務隊列中彈出的真實任務。

在C++11中,

  std::unique_lock<std::mutex> lock(this->queue_mutex);

 

可以在退出作用區域時自動解鎖,無需顯式解鎖。所以,{}起的作用就是在退出 } 時自動回釋放線程池的queue_mutex。

在{}中,我們先對任務隊列加鎖,然后根據條件變量判斷條件是否滿足。

  void
  wait(unique_lock<mutex>& lock, _Predicate p)
  {
      while (!p())
          wait(lock);
  }

 

為條件標量wait的運行機制, wait在p 為false的狀態下,才會進入wait(lock)狀態。當前線程阻塞直至條件變量被通知

   this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });

 

所以p表示上述代碼中的lambda表達式[this]{ return this->stop || !this->tasks.empty(); },其中this->stop為false, !this->tasks.empty()也為false。即其表示若線程池已停止或者任務隊列中不為空,則不會進入到wait狀態。

由於剛開始創建線程池,線程池表示未停止,且任務隊列為空,所以每個線程都會進入到wait狀態。

 

 

 

(借用 https://blog.csdn.net/shichao1470/article/details/89856443 一張圖便於說明wait的過程)

在線程池剛剛創建,所有的線程都阻塞在了此處,即wait處。

若后續條件變量來了通知,線程就會繼續往下進行:

  if(this->stop && this->tasks.empty())
      return;

 

若線程池已經停止且任務隊列為空,則線程返回,沒必要進行死循環。

  task = std::move(this->tasks.front());
  this->tasks.pop();

 

這樣,將任務隊列中的第一個任務用task標記,然后將任務隊列中該任務彈出。(此處線程實在獲得了任務隊列中的互斥鎖的情況下進行的,從上圖可以看出,在條件標量喚醒線程后,線程在wait周期內得到了任務隊列的互斥鎖才會繼續往下執行。所以最終只會有一個線程拿到任務,不會發生驚群效應)

在退出了{ },我們隊任務隊列的所加的鎖也釋放了,然后我們的線程就可以執行我們拿到的任務task了,執行完畢之后,線程又進入了死循環。

至此,我們分析了ThreadPool的構造函數。

添加任務函數解析

  template<class F, class... Args>
  auto ThreadPool::enqueue(F&& f, Args&&... args) 
      -> std::future<typename std::result_of<F(Args...)>::type>
  {
      using return_type = typename std::result_of<F(Args...)>::type;
  ​
      auto task = std::make_shared< std::packaged_task<return_type()> >(
              std::bind(std::forward<F>(f), std::forward<Args>(args)...)
          );
          
      std::future<return_type> res = task->get_future();
      {
          std::unique_lock<std::mutex> lock(queue_mutex);
  ​
          // don't allow enqueueing after stopping the pool
          if(stop)
              throw std::runtime_error("enqueue on stopped ThreadPool");
  ​
          tasks.emplace([task](){ (*task)(); });
      }
      condition.notify_one();
      return res;
  }

 

添加任務的函數本來不難理解,但是作者增加了許多新的C++11特性,這樣就變得難以理解了。

  template<class F, class... Args>
  auto ThreadPool::enqueue(F&& f, Args&&... args) 
      -> std::future<typename std::result_of<F(Args...)>::type>

 

equeue是一個模板函數,其類型形參為F與Args。其中class... Args表示多個類型形參。

auto用於自動推導出equeue的返回類型,函數的形參為(F&& f, Args&&... args),其中&&表示右值引用。表示接受一個F類型的f,與若干個Args類型的args。

-> std::future<typename std::result_of<F(Args...)>::type>

表示返回類型,與lambda表達式中的表示方法一樣。

返回的是什么類型呢?

  typename std::result_of<F(Args...)>::type   //獲得以Args為參數的F的函數類型的返回類型
  std::future<typename std::result_of<F(Args...)>::type>
  //std::future用來訪問異步操作的結果

 

所以,最終返回的是放在std::future中的F(Args…)返回類型的異步執行結果。

舉個簡單的例子來理解吧:

  
  // 來自 packaged_task 的 future
  std::packaged_task<int()> task([](){ return 7; }); // 包裝函數,將lambda表達式進行包裝
  std::future<int> f1 = task.get_future();  // 定義一個future對象f1,存放int型的值。此處已經表明:將task掛載到線程上執行,然后返回的結果才會保存到f1中
  std::thread(std::move(task)).detach(); // 將task函數掛載在線程上運行
  ​
  f1.wait();  //f1等待異步結果的輸入
  f1.get();   //f1獲取到的異步結果
  
  struct S {
      double operator()(char, int&);
      float operator()(int) { return 1.0;}
  };
  ​
  std::result_of<S(char, int&)>::type d = 3.14; // d 擁有 double 類型,等價於double d = 3.14
  std::result_of<S(int)>::type x = 3.14; // x 擁有 float 類型,等價於float x = 3.14

 

經過上述兩個簡單的小例子可以知道:

  -> std::future<typename std::result_of<F(Args...)>::type>
  //等價於
  //F(Args...) 為  int f(args)
  //std::result_of<F(Args...)>::type  表示為 int
  //std::future<int> f1
  //return f1
  //在后續我們根據f1.get就可以取出存放在里面的int值
  //最終返回了一個F(Args...)類型的值,而這個值是存儲在std::future中,因為線程是異步處理的
 
        

接着分析:

  
 using return_type = typename std::result_of<F(Args...)>::type;

表示使用return_type表示F(Args...)的返回類型。

  
  auto task = std::make_shared< std::packaged_task<return_type()> >(
              std::bind(std::forward<F>(f), std::forward<Args>(args)...)
          );

 

由上述小例子,我們已經知道std::packaged_task是一個包裝函數,所以

  auto sp = std::make_shared<C>(12);   --->   auto sp = new C(12)  //創建一個智能指針sp,其指向一個用12初始化的C類對象
      
  std::packaged_task<return_type()>   //表示包裝一個返回值為return_type的函數
      
  auto task = std::make_shared< std::packaged_task<return_type()> > (std::bind(std::forward<F>(f), std::forward<Args>(args)...)   //創建一個智能指針task,其指向一個用std::bind(std::forward<F>(f), std::forward<Args>(args)... 來初始化的 std::packaged_task<return_type()> 對象
//即  std::packaged_task<return_type()> t1(std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  //然后task指向了t1,即task指向了返回值為return_type的f(args)
      
  std::packaged_task<int()> task(std::bind(f, 2, 11));    //將函數f(2,11)打包成task,其返回值為int

 

 

所以最終,task指向了傳遞進來的函數。

   std::future<return_type> res = task->get_future();
  //res中保存了類型為return_type的變量,有task異步執行完畢才可以將值保存進去

 

所以,res會在異步執行完畢后即可獲得所求。

  {
      std::unique_lock<std::mutex> lock(queue_mutex);
  ​
      // don't allow enqueueing after stopping the pool
      if(stop)
          throw std::runtime_error("enqueue on stopped ThreadPool");
  ​
      tasks.emplace([task](){ (*task)(); });  //(*task)() ---> f(args)
  }

 

在新的作用於內加鎖,若線程池已經停止,則拋出異常。

否則,將task所指向的f(args)插入到tasks任務隊列中。需要指出,這兒的emplace中傳遞的是構造函數的參數。

  condition.notify_one(); //任務加入任務隊列后,需要去喚醒一個線程
  return res; //待線程執行完畢,將異步執行的結果返回
 
        

經過上述分析,這樣將每個人物插入到任務隊列中的過程就完成了。

 

析構函數解析

  inline ThreadPool::~ThreadPool()
  {
      {
          std::unique_lock<std::mutex> lock(queue_mutex);
          stop = true;
      }
      condition.notify_all();
      for(std::thread &worker: workers)
          worker.join();
  }

 

在析構函數中,先對任務隊列中加鎖,將停止標記設置為true,這樣后續即使有新的插入任務操作也會執行失敗。

使用條件變量喚醒所有線程,所有線程都會往下執行:

  if(this->stop && this->tasks.empty())
      return;

 

在stop設置為true且任務隊列中為空時,對應的線程進而跳出循環結束。

  for(std::thread &worker: workers)
     worker.join();

 

將每個線程設置為join,等到每個線程結束完畢后,主線程再退出。

 

主函數解析

  ThreadPool pool(4); //創建一個線程池,池中線程為4
  std::vector< std::future<int> > results;    //創建一個保存std::future<int>的數組,用於存儲4個異步線程的結果
for(int i = 0; i < 8; ++i) {    //創建8個任務
      results.emplace_back(   //一次保存每個異步結果
          pool.enqueue([i] {  //將每個任務插入到任務隊列中,每個任務的功能均為“打印+睡眠1s+打印+返回結果”
              std::cout << "hello " << i << std::endl;
              std::this_thread::sleep_for(std::chrono::seconds(1));
              std::cout << "world " << i << std::endl;
              return i*i;
          })
      );
  }
  ​
  for(auto && result: results)    //一次取出保存在results中的異步結果
      std::cout << result.get() << ' ';
  std::cout << std::endl;

 

需要對主函數中的任務函數進行說明:

  [i] {   //將每個任務插入到任務隊列中,每個任務的功能均為“打印+睡眠1s+打印+返回結果”
      std::cout << "hello " << i << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(1));
      std::cout << "world " << i << std::endl;
      return i*i;
  }

 

這個lambda表達式用來表示一個匿名函數,該函數分寫執行 打印-睡眠-打印-返回結果。

pool.enqueue(fun);

 

對應於類中的

  auto ThreadPool::enqueue(F&& f, Args&&... args) 
      -> std::future<typename std::result_of<F(Args...)>::type>

其中,F&& flambda表達式(或者說fun)的形參,而參數為0。

std::future<typename std::result_of<F(Args...)>::type>

則用來保存 i*i 

對應的

std::result_of<F(Args...)>::type    //int型

上述是簡要的分析。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM