Bash腳本實現批量作業並行化


http://jerkwin.github.io/2013/12/14/Bash%E8%84%9A%E6%9C%AC%E5%AE%9E%E7%8E%B0%E6%89%B9%E9%87%8F%E4%BD%9C%E4%B8%9A%E5%B9%B6%E8%A1%8C%E5%8C%96/

在Linux下運行作業時, 經常會遇到以下情形: 有大量作業需要運行, 完成每個作業所需要的時間也不是很長. 如果我們以串行方式來運行這些作業, 可能要耗費較長的時間; 若采用並行方式運行則可以大大節約運行時間. 再者, 目前的計算機絕大部分都是多核架構, 要想充分發揮它們的計算能力也需要並行化. 總結網上看到的資料, 利用Bash腳本, 可以采用下面幾種方法實現批量作業的並行化. 注意, 下面論述中將不會區分進程和線程, 也不會區分並行和並發.

1. 采用GNU的paralle程序

parallel是GNU專門用於並行化的一個程序, 對於簡單的批量作業並行化非常合適. 使用parallel不需要編寫腳本, 只需在原命令的基礎上簡單地加上parallel就可以了. 所以, 如果能用paralle並行化你的作業, 請優先使用. 有關paralle的詳細說明, 請參考其官方文檔.

2. 最簡單的並行化方法: &+wait

利用Bash的后台運行&wait函數, 可實現最簡單的批量作業並行化. 如下面的代碼, 串行執行大約需要10秒

# Language: bash
for((i=1; i<=3; i++)); do {
	sleep 3
	echo "DONE!"
} done

改為下面的簡單並行代碼理想情況下可將運行時間壓縮到3秒左右

# Language: bash
for((i=1; i<=3; i++)); do {
	sleep 3
	echo "DONE!"
} & done
wait

3. 進程數可控的並行化方法(1): 模擬隊列

使用Bash腳本同時運行多個進程並無困難, 主要存在的問題是如何控制同時運行的進程數目. 上面的簡單並行化方法使用時進程數無法控制, 因而功能有限, 因為大多數時候我們需要運行的作業數遠遠超過可用處理器數, 這種情況下若大量作業同時在后台運行, 會導致運行速度變慢, 並行效率大大下降. 一種簡單的解決方案就是模擬一個限定最大進程數的隊列, 以進程PID做為隊列元素, 每隔一定時間檢查隊列, 若隊列中有作業完成, 則添加新的作業到隊列中. 這種方法還可以避免由於不同作業耗時不同而產生的無用等待. 下面是根據網上的代碼改寫的一種實現. 實用性更強的代碼, 請參考原文.

# Language: bash
Njob=10    # 作業數目
Nproc=5    # 可同時運行的最大作業數

function CMD {        # 測試命令, 隨機等待幾秒鍾
	n=$((RANDOM % 5 + 1))
	echo "Job $1 Ijob $2 sleeping for $n seconds ..."
	sleep $n
	echo "Job $1 Ijob $2 exiting ..."
}
function PushQue {    # 將PID壓入隊列
	Que="$Que $1"
	Nrun=$(($Nrun+1))
}
function GenQue {     # 更新隊列
	OldQue=$Que
	Que=""; Nrun=0
	for PID in $OldQue; do
		if [[ -d /proc/$PID ]]; then
			PushQue $PID
		fi
	done
}
function ChkQue {     # 檢查隊列
	OldQue=$Que
	for PID in $OldQue; do
		if [[ ! -d /proc/$PID ]] ; then
			GenQue; break
		fi
	done
}

for((i=1; i<=$Njob; i++)); do
	CMD $i &
	PID=$!
	PushQue $PID
	while [[ $Nrun -ge $Nproc ]]; do
		ChkQue
		sleep 1
	done
done
wait

一個更簡潔的方法是記錄PID到數組, 通過檢查PID存在與否以確定作業是否運行完畢. 可實現如下

# Language: bash
Njob=10    # 作業數目
Nproc=5    # 可同時運行的最大作業數

function CMD {        # 測試命令, 隨機等待幾秒鍾
	n=$((RANDOM % 5 + 1))
	echo "Job $1 Ijob $2 sleeping for $n seconds ..."
	sleep $n
	echo "Job $1 Ijob $2 exiting ..."
}

PID=() # 記錄PID到數組, 檢查PID是否存在以確定是否運行完畢
for((i=1; i<=Njob; )); do
	for((Ijob=0; Ijob<Nproc; Ijob++)); do
		if [[ $i -gt $Njob ]]; then
			break;
		fi
		if [[ ! "${PID[Ijob]}" ]] || ! kill -0 ${PID[Ijob]} 2> /dev/null; then
			CMD $i $Ijob &
			PID[Ijob]=$!
			i=$((i+1))
		fi
	done
	sleep 1
done
wait

3. 進程數可控的並行化方法(2): 命名管道

上面的並行化方法也可利用命名管道來實現, 命名管道是Linux下進程間進行通訊的一種方法, 也稱為先入先出(fifo, first in first out)文件. 具體方法是創建一個fifo文件, 作為進程池, 里面存放一定數目的”令牌”. 作業運行規則如下: 所有作業排隊依次領取令牌; 每個作業運行前從進程池中領取一塊令牌, 完成后再歸還令牌; 當進程池中沒有令牌時, 要運行的作業只能等待. 這樣就能保證同時運行的作業數等於令牌數. 前面的模擬隊列方法實際就是以PID作為令牌的實現.

據我已查看的資料, 這種方法在網絡上討論最多. 實現也很簡潔, 但理解其代碼需要的Linux知識較多. 下面是我改寫的示例代碼及其注釋.

# Language: bash
Njob=10    # 作業數目
Nproc=5    # 可同時運行的最大作業數

function CMD {        # 測試命令, 隨機等待幾秒鍾
	n=$((RANDOM % 5 + 1))
	echo "Job $1 Ijob $2 sleeping for $n seconds ..."
	sleep $n
	echo "Job $1 Ijob $2 exiting ..."
}

Pfifo="/tmp/$$.fifo"   # 以PID為名, 防止創建命名管道時與已有文件重名,從而失敗
mkfifo $Pfifo          # 創建命名管道
exec 6<>$Pfifo         # 以讀寫方式打開命名管道, 文件標識符fd為6
                       # fd可取除0, 1, 2,5外0-9中的任意數字
rm -f $Pfifo           # 刪除文件, 也可不刪除, 不影響后面操作

# 在fd6中放置$Nproc個空行作為令牌
for((i=1; i<=$Nproc; i++)); do
	echo
done >&6

for((i=1; i<=$Njob; i++)); do  # 依次提交作業
	read -u6                   # 領取令牌, 即從fd6中讀取行, 每次一行
                               # 對管道,讀一行便少一行,每次只能讀取一行
                               # 所有行讀取完畢, 執行掛起, 直到管道再次有可讀行
                               # 因此實現了進程數量控制
	{                          # 要批量執行的命令放在大括號內, 后台運行
		CMD $i && {            # 可使用判斷子進程成功與否的語句
			echo "Job $i finished"
		} || {
			echo "Job $i error"
		}
		sleep 1     # 暫停1秒,可根據需要適當延長,
                    # 關鍵點,給系統緩沖時間,達到限制並行進程數量的作用
		echo >&6    # 歸還令牌, 即進程結束后,再寫入一行,使掛起的循環繼續執行
	} &

done

wait                # 等待所有的后台子進程結束
exec 6>&-           # 刪除文件標識符

注意:

(1) exec 6<>$Pfifo 這一句很重要, 若無此語句, 向$Pfifo寫入數據時, 程序會被阻塞, 直到有read讀出了文件中的數據為止. 而執行了此語句, 就可以在程序運行期間不斷向文件寫入數據而不會阻塞, 並且數據會被保存下來以供read讀出.

(2) 當$Pfifo中已經沒有數據時, read無法讀到數據, 進程會被阻塞在read操作上, 直到有子進程運行結束, 向$Pfifo寫入一行.

(3) 核心執行部分也可使用如下方式

# Language: bash
for((i=1; i<=$Njob; i++)); do
	read -u6
	(CMD $i; sleep 1; echo >&6) &
done

{}和()的區別在shell是否會衍生子進程

(4) 此方法在目前的Cygwin(版本1.7.27)下無法使用, 因其不支持雙向命名管道. 有人提到一個解決方案, 使用兩個文件描述符來替代單個文件描述符, 但此方法我沒有測試成功.

參考資料

  1. 簡潔的模擬隊列方法實現

    shell里如何實現”多線程”?

  2. 模擬隊列方法, 實用性更強的代碼

    A srcipt for running processes in parallel in Bash

  3. 命名管道方法及其解釋
  4. Cygwin下雙向命名管道的問題: bi-directional named pipe


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM