【說解】在shell中通過mkfifo創建命名管道來控制多個進程並發執行


背景:

工作中有兩個異地機房需要傳數據,數據全名很規范,在某個目錄下命名為統一的前綴加上編號。如/path/from/file.{1..100}。而機房間的專線對單個scp進程的傳輸速度是有限制的,比如最大在100Mb/s,如果直接啟動100個scp,則又會遇到ssh的並發連接數限制。

所以需要控制並發數,即不超過ssh的並發限制,又要讓單網卡上的帶寬接近飽和,盡快完成傳輸(假設專線帶寬遠大於單機網卡帶寬)

實現

之前知道通過mkfifo創建一個命名管道,可以實現對並發的控制。現在來實現一個。

在此之前,如果對mkfifo不了解,可以參考這個連接,作者寫得很詳細,我就不造輪子了。

這里直接給出代碼,並做一些解釋。因為單進程的帶寬如上所述,所以考慮9個並發。代碼如下:

 1 #!/bin/bash
 2 
 3 your_func()
 4 {   # use your cmd or func instead of sleep here. don't end with background(&)
 5     date +%s
 6     echo "scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/"
 7     sleep 2
 8 }
 9 
10 concurrent()
11 {   # from $1 to $2, (included $1,$2 itself), con-current $3 cmd
12     start=$1 && end=$2 && cur_num=$3
13 
14     # ff_file which is opened by fd 4 will be really removed after script stopped
15     mkfifo   ./fifo.$$ &&  exec 4<> ./fifo.$$ && rm -f ./fifo.$$
16 
17     # initial fifo: write $cur_num line to $ff_file
18     for ((i=$start; i<$cur_num+$start; i++)); do
19         echo "init time add $i" >&4
20     done
21 
22     for((i=$start; i<=$end; i++)); do
23         read -u 4   # read from mkfifo file
24         {   # REPLY is var for read
25             echo -e "-- current loop: [cmd id: $i ; fifo id: $REPLY ]"
26 
27             your_func $i
28             echo "real time add $(($i+$cur_num))"  1>&4 # write to $ff_file
29         } & # & to backgroud each process in {}
30     done
31     wait    # wait all con-current cmd in { } been running over
32 }
33 
34 concurrent 0 8 3

上面以3為並發數,執行0到8號共9次,以便顯示如下執行結果。

 1 bash concurrent.sh
 2 -- current loop: [cmd id: 0 ; fifo id: init time add 0 ]
 3 -- current loop: [cmd id: 1 ; fifo id: init time add 1 ]
 4 -- current loop: [cmd id: 2 ; fifo id: init time add 2 ]
 5 1453518400
 6 1453518400
 7 scp HOSTNAME:/home/USER/path/from/file.0 REMOTE_HOST:/home/USER/path/to/
 8 scp HOSTNAME:/home/USER/path/from/file.2 REMOTE_HOST:/home/USER/path/to/
 9 1453518400
10 scp HOSTNAME:/home/USER/path/from/file.1 REMOTE_HOST:/home/USER/path/to/
11 -- current loop: [cmd id: 3 ; fifo id: real time add 3 ]
12 -- current loop: [cmd id: 4 ; fifo id: real time add 5 ]
13 -- current loop: [cmd id: 5 ; fifo id: real time add 4 ]
14 1453518402
15 scp HOSTNAME:/home/USER/path/from/file.3 REMOTE_HOST:/home/USER/path/to/
16 1453518402
17 1453518402
18 scp HOSTNAME:/home/USER/path/from/file.5 REMOTE_HOST:/home/USER/path/to/
19 scp HOSTNAME:/home/USER/path/from/file.4 REMOTE_HOST:/home/USER/path/to/
20 -- current loop: [cmd id: 6 ; fifo id: real time add 6 ]
21 -- current loop: [cmd id: 7 ; fifo id: real time add 7 ]
22 -- current loop: [cmd id: 8 ; fifo id: real time add 8 ]
23 1453518404
24 scp HOSTNAME:/home/USER/path/from/file.6 REMOTE_HOST:/home/USER/path/to/
25 1453518404
26 1453518404
27 scp HOSTNAME:/home/USER/path/from/file.7 REMOTE_HOST:/home/USER/path/to/
28 scp HOSTNAME:/home/USER/path/from/file.8 REMOTE_HOST:/home/USER/path/to/

從date輸出的時間上,可以看出,每2秒會執行3個並發。

說明

整體過程

設N的值為並發數。通過在fifo中初始化N行內容(可以為空值),再利用fifo的特性,從fifo中每讀一行,啟動一次your_func調用,當fifo讀完N次時,fifo為空。再讀時就會阻塞。這樣開始執行時就是N個並發(1-N)。

當並發執行的進程your_func,任意一個完成操作時,下一步會招待如下語句:

echo "real time add $(($i+$cur_num))"  1>&4

這樣就對fifo新寫入了一行,前面被阻塞的第N+1號待執行的進程read成功,開始進入{}語句塊執行。這樣通過read fifo的阻塞功能,實現了並發數的控制。

需要注意的是,當並發數較大時,多個並發進程即使在使用sleep相同秒數模擬時,也會存在進程調度的順序問題,因而並不是按啟動順序結束的,可能會后啟動的進程先結束。

從而導致如下語句所示的輸出中,兩個數字並不一定是相等的。並發數越大,這種差異性越大。

-- current loop: [cmd id: 8 ; fifo id: real time add 9 ]

自定義函數

修改自定義函數your_func,這個函數實際只需要一行就完成了。

your_func()
{   # use your cmd or func instead of sleep here. don't end with background(&)
    date +%s
    scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/
}

需要注意的是,scp命令最后不需要添加壓后台的&符號。因為在上一級就已經壓后台並發了。

再來說明concurrent函數的第14行。

exec digit<>  filename

這是一個平常很少使用到的命令。特別是‘<>’這個符號。既然不明白我們來查一下系統幫助。

man bash
# search 'exec '

Opening File Descriptors for Reading and Writing
       The redirection operator

              [n]<>word

       causes  the  file  whose  name  is  the  expansion  of word to be opened for both reading and writing on file
       descriptor n, or on file descriptor 0 if n is not specified.  If the file does not exist, it is created.

通過man bash來搜索exec加空格,會找到對exec的說明。注意如果直接man exec,會搜索到linux programer's manual,是對execl, execlp, execle, execv, execvp, execvpe - execute a file這一堆系統函數的調用說明。

還要注意哦,4<> 這幾個字符不要加空格,必然連着寫。word前可以加空格。

rm file

mkfifo先創建管道文件,再通過exec將該文件綁定到文件描述符4。也許你在疑惑后面的rm操作。其實當該文件綁定到文件描述符后,內核已經通過open系統調用打開了該文件,這個時候執行rm操作,刪除的是文件的Inode,但concurrent函數已經連接到文件的block塊區。

如果你遇到過這樣的情況,你就明白了:如果線上的nginx日志是沒有切分的,access.log會越來越大,這時你直接rm access.log文件后,文件不見了,但df查看系統並沒有釋放磁盤空間。這就是因為rm只是刪除了inode,但這之前nginx早已經通過open打開了這個文件,nginx進程的進程控制塊中的文件描述符表中對應的fd,已經有相應的文件指針指向了該文件在內存中的文件表,以及其在內存中的v節點表,並最終指向文件的實際存儲塊。因此nginx依然可以繼續寫日志,磁盤還在被寫入。只有重啟或者reload,讓進程重新讀一次配置,重新打開一遍相應的文件時,才會發現該文件不存在的,並新建該文件。而這時因為Inode節點已經釋放,再用df查看時就能看到可用空間增大了。

不懂可以參考APUE的圖3.1及想着說明。

因此14行的rm並不影響后繼腳本執行,直到腳本結束,系統收回所有文件描述符。

初始化

18-20行在做初始化管道的工作。其中讀取管道有兩類寫法:

1 # style 1
2     for ((i=$start; i<$cur_num+$start; i++)); do
3         echo "init time add $i" >&4
4     done
5 
6 # style 2
7     for ((i=$start; i<$cur_num+$start; i++)); do
8         echo "init time add $i"
9     done >&4

差別就是‘>&4’ 這幾個字符放在echo語句后面,還是放在done后面,兩者都可以,前者針對echo語句,后者針對整個for循環。

同理,在下一個for循環中,read命令也有兩種方式:

# style 1
for((i=$start; i<=$end; i++)); do
        read -u 4   
        {   
            your_func $i
            echo "real time add $(($i+$cur_num))"  1>&4 # write to $ff_file
        } & 
done

# style 2
for((i=$start; i<=$end; i++)); do
        read   
        {   
            your_func $i
            echo "real time add $(($i+$cur_num))"  1>&4 # write to $ff_file
        } & 
done <&4

關於REPLY

再解釋一下REPLY變量。這是上述循環中,用來存放read命令從fifo中讀到的內容。其實在整個腳本中,是不需要關注這個點的。不過這里隨帶也解釋一下。

通過能fifo的不斷讀寫,才實現了echo如下語句:

-- current loop: [cmd id: 7 ; fifo id: real time add 7 ]

如何了解到REPLY呢?我們又得man一下了。為了找到read的參數。先man read發現不對。再如下查找,因為read是bash自建命令。

1 man  bash 
2 # search  'Shell Variables'
3 
4        REPLY  Set to the line of input read by the read builtin command when no arguments are supplied.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM