淺談Pthread


最近由於疫情,一直宅在家里,日子過得非常划水。不過也是趁着這段時間的閑工夫,整理下並行計算一些基本點和常用實現方法。這里就不按照順序展開了,先介紹下共享存儲編程模式中的重要基礎--pthread。

共享存儲

簡要概述

並行程序與串行程序實現上的重要區別就是執行者的數量不止一個,所以既然不是單打獨斗的工作那就需要不同執行者之間進行信息交換,所以一般的並行編程模型(注意這個指的是parallel而不是concurrence)是可以簡單的按照不同處理單元(執行者)之間信息交換的方式而進行分類。我們這里主要討論的是使用共享存儲的編程模型,這種編程模型相對於消息傳遞(MPI)在實現上更為直觀,但是實際調優的時候難度也比較大。
在之前的文章中我說明了,在linux中沒有像win中標准實現的進程模式,我們的進程和線程都是linux task加了對應不同的flags得到的。進程要求的條件更多,要求有自身獨立的堆棧、寄存器和變量空間(code,data,file);線程相對更為輕量,可以共享全局變量等資源,在一個進程下的不同線程甚至可以訪問對方open的文件。
我們對於線程問題也需要注意考慮同步,死鎖和緩存一致性。這里我們會討論前兩種問題,緩存一致性問題一般都是通過多處理器總線一致性協議support的,在編程的時候不需要特意關注。

主要應用

在真實的使用環境中(我主要的使用環境是科學計算),線程的創建相對於進程來講要快上很多,而且及時是最壞情況下,例如我的一個server線程訪問numa遠端內存的時候,mem-cpu的帶寬也是要高於MPI帶寬的。但是共享內存的並行方式擴放性不是很好,工業界幾乎沒有跨節點的共享內存機器(可以通過字典表實現,但是訪存延遲過高),所以在需要大量計算核心跨節點計算的時候,只依靠共享內存的方式是行不通的。我在實際的使用中更傾向於,使用共享內存的方式做單節點上的並行,而節點之前的消息傳遞則使用消息傳遞的模式。可以簡單的理解為共享存儲的並行粒度更細了。

pthread

有了前面的說明,方便這里簡單介紹下pthread。其實早期的時候線程實現對於不同制造商生產的機器是有很大的不同的,但是大家的標准都不能共通,給A機器開發的多線程模式放到B機器上就行不通確實不是一個好主意。所以慢慢的開始有了標准。這里談的pthread其實就是在POSIX標准中的一部分。POSIX的全稱是Potable Operating System Interface,其在Unix-like系統中是通用的。Pthread實際上就是POSIX標准對threads的實現。

創建線程

由於pthrea相對low level,所以它的API其實是比較簡單的。創建thread API如下:

pthread_create(thread,attr,routine,arg)
// thread: 新thread的token id
// attr: 設置thread的某些特性,一般情況下都是NULL
// routine: thread創建之后需要執行的routine
// arg: routine執行的時候需要的參數

這里面需要注意的是,pthread在使用上面的API創建threads的時候,是需要同時bind一個routine去給它執行的。在程序的執行過程中,我們以程序入口(main program)進入,然后執行中調用pthread_create創建線程執行一個函數,該函數在執行的時候脫離主線程執行,最后返回值的時候再join回主線程。我用visio畫了一個圖示(第一次使用+沒有鼠標,畫的太丑):
pthread create標准執行流程
值得一提的是,正如之前所講,pthread是low level的,線程執行函數定義的時候參數列表是void,所以我們不能直接使用arg作為函數變量列表,而是一般使用一個結構體,我們需要在函數執行中,再將之”解包“。而且別忘了函數執行結束之后,將status返回並join到主線程,突出4個字,落葉歸根!
除了這個落葉歸根的作用之外,如果我們深入的理解下join的過程可以知道其同時也起到了barrier的作用。主線程將會在threadid線程沒有join之前block住,可以起到同步的作用。若我們不想使用類似的fork-join模式的時候有要如何操作呢?我們可以使用pthread_detach(threadid)。一旦我們將threadid的線程detach,那么它就相當於離家出走的孩子,不存在落葉歸根的問題了(不會join回來)。試想一個使用場景,我們有一個web server來分享某個同化數據,但是不同用戶可能需要的數據經緯度網格是不一樣的,所以我們在處理的時候往往只是需要一個主線程來監聽用戶的請求,這個主線程將工作分配給工作線程做一些查找、插值重組過程之后我們會直接將數據返回給用戶而不用再告訴主線程。也就是主線程實際上就是起到了大堂經理的作用,真正給我們端茶倒水的還是美麗可人的服務生。這樣的模式中,主線程不需要等待工作線程結束,也不需要對工作線程的資源進行回收,實際上還可以節省系統資源。

一些多線程問題

從一開始的計算機導論課上我們就知道了程序需要有確定性,同一個程序相同條件的輸出應該是確定的。我們玩的可不是薛定諤的計算機。在多線程問題中,我們可以認為不同線程的執行順序不應當影響到我們最后的結果。

同步問題

最簡單的例子就是兩個線程對於share同一個累加變量的問題(我們可以假設緩存是寫直達的,但是這兩個線程的寄存器可能不同)。由於變量累加的過程本身不是原子化的,所以其執行過程中需要不止一個指令,這樣兩個不同線程指令執行的順序就會影響到最終的結果。一個普遍的想法就是使用編譯器提供的原子操作,or設置同步。

同步的實現--Lock

同步問題我們知道了,首先可以聊一聊其最直觀的實現方式--鎖。我們把上面的問題抽象化一些,對於讀操作都是ok的,但是對於寫操作我們需要給share var加上以臨界區來保證同一個寫操作時刻,臨界區中只能有一個線程在執行。線程一般是先要取得臨界區的許可,然后才能進入臨界區中搞事情,最后交還許可,退出臨界區。
那什么又是lock呢,我們只需要把前面所說的許可換成lock不就可以了。lock有很多實現的方式,linux這里使用的是mutex,由於mutex本身是結構體,所以聲名的時候別忘了init一下,like this:

#include "pthread.h"

pthread_mutex_t mutex; //這里要保證是global var,要不別的進程咋知道呢
pthread_mutex_init(&mutex, NULL); //init mutex,你好我也好

pthread_mutex_lock(&mutex); //老司機准備進入臨界區了,各位准備好

// critical section //

pthread_mutex_unlock(&mutex); //釋放鎖,有借有還,再借不難
pthread_mutex_destroy(&mutex); // 過河拆橋,鎖用不上了

Condition vatiables(cv)

注意,它可不是簡歷。
cv的含義其實有點偏向於MPI的思想,傳遞消息。我們可以讓threads wait(乖乖站好!),也可以notify or broadcast線程讓他們起來干活。主要的操作有一下三種:

phtread_cond_init();

pthread_cond_wait(&theCV,&somelock); //這里sleep線程的同時也將somelock釋放掉了,要不其他線程無法取得lock就沒辦法執行(甚至是叫醒它了)
pthread_cond_signal(&theCV); 
pthread_cond_boardcast(&theCV);

需要注意的是,三種thread調度方式需要在一個臨界區中。也就是我們一般使用的時候,是需要和上面的lock同時使用的。那么問題來了,為什么一定要這樣做呢?
這是因為我們在臨界區中處理寫share量的時候,需要保證不同線程對其的訪問是可控的,否則可能在不同線程中讀取到的寫share量不一致進而影響CV的工作,因為一般情況下臨界區中的寫share量就是我們CV工作中的重要判斷量。因此,雖然這個條件相對嚴格,但是是有必要的。

thread pool

講到這里,其實我想表達的是pthread本身是很好的實現,它非常靈活,你可以精確的指定每一個線程的routine讓它們執行完全不同的任務。但是在享受自由的同時,我們也需要承擔應有的責任,這在編程上是十分蛋疼的。所以這里引入一個thread pool的概念來減減壓。
如果說我之前所講的create thread的方式,做個類比,好像是古代打仗的時候臨時一個一個拼湊起來的部隊,東一個洗一個,那么thread pool則是在runtime中一直存在的一個常備軍,waiting for serve。我們可以使用編譯器/虛擬機(like java)提供的thread pool創建線程池。這樣做的好處是顯而易見的,首先我們對於線程的使用是更快的,畢竟不需要每次使用的時候都現場去創建;並且也限制了線程的總數,不會超過系統上線導致server gg。

thread pool實現

賞析一段源碼:

/**
 * threadpool.c
 *
 * This file will contain your implementation of a threadpool.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include "threadpool.h"

// _threadpool is the internal threadpool structure that is
// cast to type "threadpool" before it given out to callers

typedef struct work_st{
	void (*routine) (void*);
	void * arg;
	struct work_st* next;
} work_t;

typedef struct _threadpool_st {
   // you should fill in this structure with whatever you need
	int num_threads;	//number of active threads
	int qsize;			//number in the queue
	pthread_t *threads;	//pointer to threads
	work_t* qhead;		//queue head pointer
	work_t* qtail;		//queue tail pointer
	pthread_mutex_t qlock;		//lock on the queue list
	pthread_cond_t q_not_empty;	//non empty and empty condidtion vairiables
	pthread_cond_t q_empty;
	int shutdown;
	int dont_accept;
} _threadpool;

/* This function is the work function of the thread */
void* do_work(threadpool p) {
	_threadpool * pool = (_threadpool *) p;
	work_t* cur;	//The q element
	int k;

	while(1) {
		pool->qsize = pool->qsize;
		pthread_mutex_lock(&(pool->qlock));  //get the q lock.


		while( pool->qsize == 0) {	//if the size is 0 then wait.  
			if(pool->shutdown) {
				pthread_mutex_unlock(&(pool->qlock));
				pthread_exit(NULL);
			}
			//wait until the condition says its no emtpy and give up the lock. 
			pthread_mutex_unlock(&(pool->qlock));  //get the qlock.
			pthread_cond_wait(&(pool->q_not_empty),&(pool->qlock));

			//check to see if in shutdown mode.
			if(pool->shutdown) {
				pthread_mutex_unlock(&(pool->qlock));
				pthread_exit(NULL);
			}
		}

		cur = pool->qhead;	//set the cur variable.  

		pool->qsize--;		//decriment the size.  

		if(pool->qsize == 0) {
			pool->qhead = NULL;
			pool->qtail = NULL;
		}
		else {
			pool->qhead = cur->next;
		}

		if(pool->qsize == 0 && ! pool->shutdown) {
			//the q is empty again, now signal that its empty.
			pthread_cond_signal(&(pool->q_empty));
		}
		pthread_mutex_unlock(&(pool->qlock));
		(cur->routine) (cur->arg);   //actually do work.
		free(cur);						//free the work storage.  	
	}
}

threadpool create_threadpool(int num_threads_in_pool) {
  _threadpool *pool;
	int i;

  // sanity check the argument
  if ((num_threads_in_pool <= 0) || (num_threads_in_pool > MAXT_IN_POOL))
    return NULL;

  pool = (_threadpool *) malloc(sizeof(_threadpool));
  if (pool == NULL) {
    fprintf(stderr, "Out of memory creating a new threadpool!\n");
    return NULL;
  }

  pool->threads = (pthread_t*) malloc (sizeof(pthread_t) * num_threads_in_pool);

  if(!pool->threads) {
    fprintf(stderr, "Out of memory creating a new threadpool!\n");
    return NULL;	
  }

  pool->num_threads = num_threads_in_pool; //set up structure members
  pool->qsize = 0;
  pool->qhead = NULL;
  pool->qtail = NULL;
  pool->shutdown = 0;
  pool->dont_accept = 0;

  //initialize mutex and condition variables.  
  if(pthread_mutex_init(&pool->qlock,NULL)) {
    fprintf(stderr, "Mutex initiation error!\n");
	return NULL;
  }
  if(pthread_cond_init(&(pool->q_empty),NULL)) {
    fprintf(stderr, "CV initiation error!\n");	
	return NULL;
  }
  if(pthread_cond_init(&(pool->q_not_empty),NULL)) {
    fprintf(stderr, "CV initiation error!\n");	
	return NULL;
  }

  //make threads

  for (i = 0;i < num_threads_in_pool;i++) {
	  if(pthread_create(&(pool->threads[i]),NULL,do_work,pool)) {
	    fprintf(stderr, "Thread initiation error!\n");	
		return NULL;		
	  }
  }
  return (threadpool) pool;
}


void dispatch(threadpool from_me, dispatch_fn dispatch_to_here,
	      void *arg) {
  _threadpool *pool = (_threadpool *) from_me;
	work_t * cur;
	int k;

	k = pool->qsize;

	//make a work queue element.  
	cur = (work_t*) malloc(sizeof(work_t));
	if(cur == NULL) {
		fprintf(stderr, "Out of memory creating a work struct!\n");
		return;	
	}

	cur->routine = dispatch_to_here;
	cur->arg = arg;
	cur->next = NULL;

	pthread_mutex_lock(&(pool->qlock));

	if(pool->dont_accept) { //Just incase someone is trying to queue more
		free(cur); //work structs.  
		return;
	}
	if(pool->qsize == 0) {
		pool->qhead = cur;  //set to only one
		pool->qtail = cur;
		pthread_cond_signal(&(pool->q_not_empty));  //I am not empty.  
	} else {
		pool->qtail->next = cur;	//add to end;
		pool->qtail = cur;			
	}
	pool->qsize++;
	pthread_mutex_unlock(&(pool->qlock));  //unlock the queue.
}

void destroy_threadpool(threadpool destroyme) {
	_threadpool *pool = (_threadpool *) destroyme;
	void* nothing;
	int i = 0;
/*
	pthread_mutex_lock(&(pool->qlock));
	pool->dont_accept = 1;
	while(pool->qsize != 0) {
		pthread_cond_wait(&(pool->q_empty),&(pool->qlock));  //wait until the q is empty.
	}
	pool->shutdown = 1;  //allow shutdown
	pthread_cond_broadcast(&(pool->q_not_empty));  //allow code to return NULL;
	pthread_mutex_unlock(&(pool->qlock));

	//kill everything.  
	for(;i < pool->num_threads;i++) {
//		pthread_cond_broadcast(&(pool->q_not_empty));
//allowcode to return NULL;/
		pthread_join(pool->threads[i],&nothing);
	}
*/
	free(pool->threads);

	pthread_mutex_destroy(&(pool->qlock));
	pthread_cond_destroy(&(pool->q_empty));
	pthread_cond_destroy(&(pool->q_not_empty));
	return;
}

其中,work_st實際上就是task func的一個wrapper,其中包括了work線程需要執行的routine和參數表;在thread_pool結構體的定義中我們也用到了這個struct,結構體中我們聲明了需要的lock和thread pool中線程的數據結構queue。這里其實可以理解為這個queue就看做是一個bounded-buffer,我們有生產者調用thread pool,讓任務加入到buffer中,也會有worker搞定buffer中的任務成為消費者,我們要做的實際上就是維持這個過程。這里其實可以不使用counter統計bound-buffer中的線程數量,來減少一個臨界區的使用(counter++),但是我們需要在消費者和生產者中同時保有tail和head指針用來比較。
在 create_threadpool中,函數聲明了必要的資源之后,就是在一個循環中pthread_created線程,對應的routine則是do_work,do_work則會先整理/更新queue數據結構,並且在臨界區中當自身pool中activate thread num == 0 && !(pool not shutdown)的情況下wait對應的signal以備執行,若有需要執行的隊列,則會:

 (cur->routine) (cur->arg);   //actually do work.

執行這個線程所對應的routine。

semaphore

這個東西可以看作是lock的一個自然延申。也就是一個資源可以同時被多少執行單元使用。我們之前講到的lock就可以看做是一個binary semaphore。這里就只是簡要的談談,因為這個東西使用的時候很讓人頭大,弄不好就會死鎖。而且雖然semaphore屬於POSIX標准,但是嚴格來講的話,它不屬於pthread。
使用簡談:

#include <semaphore.h>

sem_t sem;
sem_init(&sem);
sem_wait(&sem);

// critical section

sem_post(&sem);
sem_destroy(&sem);

semaphore可以控制同一個時間有多少thread可以訪問臨界區,需要注意的就是上面聲明--釋放的匹配關系不要忘記。為啥不想多談談這個呢?你想想,你都有了thread pool,為啥還要玩這個呢?

總結

總的來說,pthread這個東西確實功能非常的強大,但是同時實現上也是比較復雜。高級語言如java/python中已經有了monitor這樣的工具可以直接使用,不需要我們用這個木頭輪子了。在科學計算這一塊上,pthread主要是為openmp這樣high level一點的編譯器特性做支持,日常使用的使用可以用pthread處理一些比較特殊的情況,而主要還是用openmp,mpi,這樣更直觀的編程模式比較好。
希望疫情可以早點過去,大家的生活重新回到正軌!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM