Python3 與 C# 並發編程之~進程先導篇


 

在線預覽:http://github.lesschina.com/python/base/concurrency/1.並發編程~進程先導篇.html

Python3 與 C# 並發編程之~ 進程篇:https://www.cnblogs.com/dotnetcrazy/p/9426279.html

Linux專項

先寫幾個問號來概況下今天准備說的內容:(謎底自己解開,文中都有)

  1. 你知道Ctrl+C終止進程的本質嗎?你知道Kill -9 pid的真正含義嗎?
  2. 你知道那些跨平台框架(Python,NetCore)在Linux下創建進程干了啥?
  3. 你了解僵屍進程孤兒進程的悲催生產史嗎?孤兒找干爹僵屍送往生想知道不?
  4. 想知道創建子進程后怎么李代桃僵嗎?ps aux | grep xxx的背后到底隱藏了什么?
  5. 你了解Linux磁盤中p類型的文件到底是個啥嗎?
  6. 為什么要realase發布而不用debug直接部署?這背后的性能相差幾何?
  7. 還有更多進程間的密密私語等着你來查看哦~

關於幫助文檔的說明:

  • 所有用到的系統函數你都可以使用man查看,eg:man 2 pipe
  • Python里面的方法你都可以通過help查看,eg:help(os.pipe)

1.概念引入

正式講課之前,先說些基本概念,難理解的用程序跑跑然后再理解:如有錯誤歡迎批評指正

並發 :一個時間段中有幾個程序都處於已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理機上運行,但任一個時刻點上只有一個程序在處理機上運行。

並行 :當一個CPU執行一個線程時,另一個CPU可以執行另一個線程,兩個線程互不搶占CPU資源,可以同時進行,這種方式我們稱之為並行(Parallel)。(在同一個時間段內,兩個或多個程序執行,有時間上的重疊)


通俗的舉個例子:

小明、小潘、小張、小康去食堂打飯,4個小伙子Coding了3天,餓爆了,現在需要1分鍾內讓他們都吃上飯,不然就有可怕的事情發生。

按照正常的流程,1分鍾可能只夠他們一個人打飯,這不行啊,於是有了幾種處理方法:

並發:快快快,一人先吃一口,輪着來,一直喂到你們都飽了(只有一個食堂打飯的窗口)(單核CPU)

並行

  • 開了2~3個窗口,和上面處理一樣,只是競爭的強度沒那么大了
  • 開了4個窗口,不着急,一人一個窗口妥妥的

對於操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開兩個瀏覽器就啟動了兩個瀏覽器進程。

有些進程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)

由於每個進程至少要干一件事,所以,一個進程至少有一個線程。像Word這種復雜的進程可以有多個線程,多個線程可以同時執行,多線程的執行方式和多進程是一樣的,也是由操作系統在多個線程之間快速切換,讓每個線程都短暫地交替運行,看起來就像同時執行一樣。

通俗講:線程是最小的執行單元,而進程由至少一個線程組成。如何調度進程和線程,完全由操作系統決定,程序自己不能決定什么時候執行,執行多長時間


PS:進程5態下次正式講程序的時候會說,然后就是==> 程序實戰不會像今天這樣繁瑣的,Code很簡單,但是不懂這些基本概念往后會吃很多虧,逆天遇到太多坑了,所以避免大家入坑,簡單說說這些概念和一些偏底層的東西~看不懂沒事,有個印象即可,以后遇到問題至少知道從哪個方向去解決

 

2.進程相關

示例代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux

2.1.fork引入

示例代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/base

(linux/unix)操作系統提供了一個fork()系統調用。普通的函數調用,調用一次,返回一次,但是fork()一次調用,兩次返回

因為操作系統自動把父進程復制了一份,分別在父進程和子進程內返回。為了便於區分,操作系統是這樣做的:子進程永遠返回0,而父進程返回子進程的ID

查看下幫助文檔:

import os

help(os.fork)
Help on built-in function fork in module posix:

fork()
    Fork a child process.

    Return 0 to child process and PID of child to parent process.

我們來跑個程序驗證一下:(PID返回值如果小於0一般都是出錯了)

import os

def main():
    print("准備測試~PID:%d" % os.getpid())
    pid = os.fork()
    if pid == 0:
        print("子進程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    elif pid > 0:
        print("父進程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

結果:

准備測試~PID:11247
父進程:PID:11247,PPID:11229
子進程:PID:11248,PPID:11247

可以查看下這個進程是啥: PPID.png

這個指令如果還不熟悉,Linux基礎得好好復習下了:https://www.cnblogs.com/dunitian/p/4822807.html,簡單分析下吧:a是查看所有(可以聯想ls -a),u是顯示詳細信息,x是把不依賴終端的進程也顯示出來(終端可以理解為:人與機器交互的那些)

技巧:指令學習可以遞增式學習:psps a ps au ps aux ps ajx

現在驗證一下復制一份是什么意思:(代碼原封不動,只是在最下面添加了一行輸出)

import os

def main():
    print("准備測試~PID:%d" % os.getpid())
    pid = os.fork() # 子進程被父進程fork出來后,在fork處往下執行

    if pid == 0:
        print("子進程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    elif pid > 0:
        print("父進程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))

    print("PID:%d,我是賣報的小行家,大風大雨都不怕" % os.getpid())

if __name__ == '__main__':
    main()

輸出:(父子進程的執行順序是系統調度決定的)

准備測試~PID:13081
父進程:PID:13081,PPID:9388
PID:13081,我是賣報的小行家,大風大雨都不怕
子進程:PID:13083,PPID:13081
PID:13083,我是賣報的小行家,大風大雨都不怕

的確是Copy了一份,Code都一樣(玩過逆向的應該知道,這份Code其實就放在了.text(代碼段)里面

子進程被父進程fork出來后,在fork處往下執行(Code和父進程一樣),這時候他們就為了搶CPU各自為戰了

最后驗證一下:各個進程地址空間中數據是完全獨立的(有血緣關系的則是:讀時共享,寫時復制,比如父子進程等)

import os

def main():
    num = 100
    pid = os.fork()
    # 子進程
    if pid == 0:
        num += 10
    elif pid > 0:
        num += 20

    print("PID:%d,PPID:%d,Num=%d" % (os.getpid(), os.getppid(), num))

if __name__ == '__main__':
    main()

輸出:(進程間通信下一節課會系統的講,今天只談Linux和概念)

PID:6369,PPID:6332,Num=120
PID:6376,PPID:6369,Num=110

擴展:(簡單了解下即可)

  1. 程序:二進制文件(占用磁盤)
  2. 進程:啟動的程序(所有數據都在內存中,需要占用CPU、內存等資源)
  3. 進程是CPU、內存、I/0設備的抽象(各個進程地址空間中數據是完全獨立的
  4. 0號進程是Linux內核進程(這么理解:初代吸血鬼)
  5. 1號進程是用戶進程,所有進程的創建或多或少和它有關系(init or systemd
  6. 2號進程和1號進程一樣,都是0號進程創建的,所有線程調度都和他有關系

先看看Linux啟動的圖示:(圖片來自網絡) 2.Linux_Start

查看一下init進程 Ubuntu

CentOS進行了優化管理~systemd CentOS

其實程序開機啟動方式也可以知道區別了:systemctl start mongodb.service and sudo /etc/init.d/ssh start

Win系列的0號進程: win


第5點的說明:(以遠程CentOS服務器為例) pstree -ps

systemd(1)─┬─NetworkManager(646)─┬─{NetworkManager}(682)
           │                     └─{NetworkManager}(684)
           ├─agetty(1470)
           ├─auditd(600)───{auditd}(601)
           ├─crond(637)
           ├─dbus-daemon(629)───{dbus-daemon}(634)
           ├─firewalld(645)───{firewalld}(774)
           ├─lvmetad(483)
           ├─master(1059)─┬─pickup(52930)
           │              └─qmgr(1061)
           ├─polkitd(627)─┬─{polkitd}(636)
           │              ├─{polkitd}(639)
           │              ├─{polkitd}(640)
           │              ├─{polkitd}(642)
           │              └─{polkitd}(643)
           ├─rsyslogd(953)─┬─{rsyslogd}(960)
           │               └─{rsyslogd}(961)
           ├─sshd(950)───sshd(50312)───sshd(50325)───bash(50326)───pstree(54258)
           ├─systemd-journal(462)
           ├─systemd-logind(626)
           ├─systemd-udevd(492)
           └─tuned(951)─┬─{tuned}(1005)
                        ├─{tuned}(1006)
                        ├─{tuned}(1007)
                        └─{tuned}(1048)

再看一個例子:

[dnt@localhost ~]$ pstree dnt -ps
sshd(50325)───bash(50326)───pstree(54471)
[dnt@localhost ~]$ pstree 50325 -ps
systemd(1)───sshd(950)───sshd(50312)───sshd(50325)───bash(50326)───pstree(54489)

其實你可以在虛擬機試試干死1號進程,就到了登錄頁面了【現在大部分系統都不讓你這么干了】 kill -9 1

-bash: kill: (1) - 不允許的操作
 

2.2.僵屍進程和孤兒進程

示例代碼:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/base

先看看定義:

孤兒進程 :一個父進程退出,而它的一個或多個子進程還在運行,那么那些子進程將成為孤兒進程。孤兒進程將被init進程(進程號為1)所收養,並由init進程對它們完成狀態收集工作。

僵屍進程 :一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態信息,那么子進程的進程描述符仍然保存在系統中。這種進程稱之為僵死進程。

通俗講就是:

孤兒進程:你爸在你之前死了,你成了孤兒,然后你被進程1收養,你死后的事宜你干爹幫你解決

僵屍進程:你掛了,你爸忙着干其他事情沒有幫你安葬,你變成了孤魂野鬼,你的怨念一直長存世間

舉個例子看看:

import os
import time

def main():
    pid = os.fork()
    if pid == 0:
        print("子進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
        time.sleep(1)  # 睡1s
    elif pid > 0:
        print("父進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))

    print("pid=%d,over" % os.getpid())

if __name__ == '__main__':
    main()

輸出: 孤兒

import os
import time

def main():
    pid = os.fork()
    if pid == 0:
        print("子進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
    elif pid > 0:
        print("父進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
        while True:
            print("父親我忙着呢,沒時間管你個小屁孩")
            time.sleep(1)

    print("pid=%d,over" % os.getpid())

if __name__ == '__main__':
    main()

輸出+測試: 僵屍

其實僵屍進程的危害真的很大,這也就是為什么有些人為了追求效率過度調用底層,不考慮自己實際情況最后發現還不如用自托管的效率高

僵屍進程是殺不死的,必須殺死父類才能徹底解決它們,下面說說怎么讓父進程為子進程‘收屍’


2.3.父進程回收子進程(wait and waitpid)

講解之前先簡單分析一下上面的Linux指令(防止有人不太清楚)

kill -9 pid ==> 以前逆天說過,是無條件殺死進程,其實這種說法不准確,應該是發信號給某個進程

-9指的就是信號道里面的SIGKILL(信號終止),你寫成kill -SIGKILL pid也一樣

-9只是系統給的一種簡化寫法(好像記得1~31信號,各個Linux中都差不多,其他的有點不一樣)

dnt@MZY-PC:~/桌面/work/PC/python/Thread/Linux kill -l
 1) SIGHUP   2) SIGINT   3) SIGQUIT  4) SIGILL   5) SIGTRAP
 6) SIGABRT  7) SIGBUS   8) SIGFPE   9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
16) SIGSTKFLT   17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG  24) SIGXCPU 25) SIGXFSZ
26) SIGVTALRM   27) SIGPROF 28) SIGWINCH    29) SIGIO   30) SIGPWR
31) SIGSYS  34) SIGRTMIN    35) SIGRTMIN+1  36) SIGRTMIN+2  37) SIGRTMIN+3
38) SIGRTMIN+4  39) SIGRTMIN+5  40) SIGRTMIN+6  41) SIGRTMIN+7  42) SIGRTMIN+8
43) SIGRTMIN+9  44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9  56) SIGRTMAX-8  57) SIGRTMAX-7
58) SIGRTMAX-6  59) SIGRTMAX-5  60) SIGRTMAX-4  61) SIGRTMAX-3  62) SIGRTMAX-2
63) SIGRTMAX-1  64) SIGRTMAX

一般搜索進程中的某個程序一般都是用這個:ps -aux | grep xxx|其實就是管道,用於有血緣關系進程間通信,等會講)

如果安裝了pstree就更方便了:pstree 13570 -ps (Ubuntu自帶,CentOS要裝下yum install psmisc

systemd(1)───systemd(1160)───gnome-terminal-(21604)───bash(8169)───python3(13570)───python3(13571)

擴展:我們平時Ctrl+C其實就是給 2)SIGINT發一個信號


2.3.1.wait

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/wait

步入正題:

Python的Wait和C系列的稍有不同,這邊重點說說Python:

help(os.wait)

Help on built-in function wait in module posix:

wait()
    Wait for completion of a child process.

    Returns a tuple of information about the child process:
        (pid, status)

os.wait()返回一個元組,第一個是進程id,第二個是狀態,正常退出是0,被九號信號干死就返回9

來個案例:

import os
import time

def main():
    pid = os.fork()
    if pid == 0:
        print("子進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
    elif pid > 0:
        print("父進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
        wpid, status = os.wait()
        print(wpid)
        print(status)

    print("pid=%d,over" % os.getpid())

if __name__ == '__main__':
    main()

輸出:

父進程:Pid=22322,PPID=10139
子進程:Pid=22323,PPID=22322
pid=22323,over
22323
0
pid=22322,over

演示一下被9號信號干死的情況:

import os
import time

def main():
    pid = os.fork()
    if pid == 0:
        print("子進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
        while True:
            print("孩子老卵,就是不聽話")
            time.sleep(1)
    elif pid > 0:
        print("父進程:Pid=%d,PPID=%d" % (os.getpid(), os.getppid()))
        wpid, status = os.wait()  # 調用一次只能回收一次,想都回收,就來個while循環,-1則退出
        print(wpid)
        print(status)
        if status == 0:
            print("正常退出")
        elif status == 9:
            print("被信號9干死了")

    print("pid=%d,over" % os.getpid())

if __name__ == '__main__':
    main()

輸出: 回收子進程


擴展:(回收所有子進程,status返回-1代表沒有子進程了,Python里面沒有子進程會觸發異常)

import os
import time

def main():
    i = 0
    while i < 3:
        pid = os.fork()
        # 防止產生孫子進程(可以自己思索下)
        if pid == 0:
            break
        i += 1

    if i == 0:
        print("i=%d,子進程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid()))
        time.sleep(1)
    elif i == 1:
        print("i=%d,子進程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid()))
        time.sleep(1)
    elif i == 2:
        print("i=%d,子進程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid()))
        time.sleep(3)
        while True:
            print("(PID=%d)我又老卵了,怎么滴~" % os.getpid())
            time.sleep(3)
    elif i==3: # 循環結束后,父進程才會退出,這時候i=3
        print("i=%d,父進程:Pid=%d,PPID=%d" % (i, os.getpid(), os.getppid()))
        while True:
            print("等待回收子進程")
            try:
                wpid, status = os.wait()
                print(wpid)
                print(status)
                if status == 0:
                    print("正常退出")
                elif status == 9:
                    print("被信號9干死了")
            except OSError as ex:
                print(ex)
                break

    print("pid=%d,over,ppid=%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

演示:看最后一句輸出,父進程掃尾工作做完就over了 回收所有子進程


2.3.2.waitpid

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/waitpid

上面說的wait方法是阻塞進程的一種方式,waitpid可以設置不阻塞進程

help(os.waitpid)

Help on built-in function waitpid in module posix:

waitpid(pid, options, /)
    Wait for completion of a given child process.

    Returns a tuple of information regarding the child process:
        (pid, status)

    The options argument is ignored on Windows.

等待進程id為pid的進程結束,返回一個tuple,包括進程的進程ID和退出信息(和os.wait()一樣),參數options會影響該函數的行為。在默認情況下,options的值為0。

  1. 如果pid是一個正數,waitpid()請求獲取一個pid指定的進程的退出信息
  2. 如果pid為0,則等待並獲取當前進程組中的任何子進程的值
  3. 如果pid為-1,則等待當前進程的任何子進程
  4. 如果pid小於-1,則獲取進程組id為pid的絕對值的任何一個進程
  5. 當系統調用返回-1時,拋出一個OSError異常。

官方原話是這樣的:(英語好的可以看看我有沒有翻譯錯)

If pid is greater than 0, waitpid() requests status information for that specific process.
If pid is 0, the request is for the status of any child in the process group of the current process. 
If pid is -1, the request pertains to any child of the current process. 
If pid is less than -1, status is requested for any process in the process group -pid (the absolute value of pid).

options:(宏)

os.WNOHANG - 如果沒有子進程退出,則不阻塞waitpid()調用
os.WCONTINUED - 如果子進程從stop狀態變為繼續執行,則返回進程自前一次報告以來的信息。
os.WUNTRACED - 如果子進程被停止過而且其狀態信息還沒有報告過,則報告子進程的信息。

補充:

  1. 進程組:每一個進程都屬於一個“進程組”,當一個進程被創建的時候,它默認是其父進程所在組的成員(你們一家
  2. 會 話:幾個進程組又構成一個會話(你們小區

用法和wait差不多,就是多了一個不阻塞線程的方法:

import os
import time

def main():
    pid = os.fork()

    if pid == 0:
        print("[子進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
        time.sleep(2)

    elif pid > 0:
        print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
        while True:
            try:
                wpid, status = os.waitpid(-1, os.WNOHANG)
                if wpid > 0:
                    print("回收子進程wpid:%d,狀態status:%d" % (wpid, status))
            except OSError:
                print("沒有子進程了")
                break

            print("父進程忙着掙錢養家呢~")
            time.sleep(3)

    print("[over]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

輸出:

[父進程]PID:1371,PPID:29604
[子進程]PID:1372,PPID:1371
父進程忙着掙錢養家呢~
[over]PID:1372,PPID:1371
回收子進程wpid:1372,狀態status:0
父進程忙着掙錢養家呢~
沒有子進程了
[over]PID:1371,PPID:29604

2.3.3.wait3 and wait4

代碼實例:https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/Linux/wait3.py

help(os.wait3)

Help on built-in function wait3 in module posix:

wait3(options)
    Wait for completion of a child process.

    Returns a tuple of information about the child process:
      (pid, status, rusage)
help(os.wait4)

Help on built-in function wait4 in module posix:

wait4(pid, options)
    Wait for completion of a specific child process.

    Returns a tuple of information about the child process:
      (pid, status, rusage)

這個是Python擴展的方法,用法和wait、waitpid差不多,我就不一個個的舉例子了,以wait3為例

import os
import time

def main():
    pid = os.fork()

    if pid == 0:
        print("[子進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
        time.sleep(2)

    elif pid > 0:
        print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
        while True:
            try:
                wpid, status, rusage = os.wait3(os.WNOHANG)
                if wpid > 0:
                    print("回收子進程wpid:%d,狀態status:%d\n詳細信息:%s" % (wpid, status,
                                                                 rusage))
            except OSError:
                print("沒有子進程了")
                break

            print("父進程忙着掙錢養家呢~")
            time.sleep(3)

    print("[over]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

輸出

[父進程]PID:2638,PPID:29604
[子進程]PID:2639,PPID:2638
父進程忙着掙錢養家呢~
[over]PID:2639,PPID:2638
回收子進程wpid:2639,狀態status:0
詳細信息:resource.struct_rusage(ru_utime=0.0052179999999999995, ru_stime=0.0052179999999999995, ru_maxrss=7032, ru_ixrss=0, ru_idrss=0, ru_isrss=0, ru_minflt=869, ru_majflt=0, ru_nswap=0, ru_inblock=0, ru_oublock=0, ru_msgsnd=0, ru_msgrcv=0, ru_nsignals=0, ru_nvcsw=2, ru_nivcsw=0)
父進程忙着掙錢養家呢~
沒有子進程了
[over]PID:2638,PPID:29604

擴展:execl and execlp

代碼實例:https://github.com/lotapp/BaseCode/blob/master/python/5.concurrent/Linux/execl.py

之前有說fork后,相當於copy了一份,.text里面放的是代碼段,如果想要調用另一個程序,可以使用execlxxx,他會把.text里面的代碼替換掉

help(os.execl)

Help on function execl in module os:

execl(file, *args)
    execl(file, *args)

    Execute the executable file with argument list args, replacing the
    current process.
help(os.execlp)

Help on function execlp in module os:

execlp(file, *args)
    execlp(file, *args)

    Execute the executable file (which is searched for along PATH)
    with argument list args, replacing the current process.

來看個例子,os.execl("絕對路徑","參數或者指令") or os.execlp("Path中包含的命令","參數或者指令")

提示:查看命令路徑:eg:which ls

import os

def main():
    pid = os.fork()
    if pid == 0:
        # 第二個參數不能為None,,第一個路徑為絕對路徑 eg:os.execl("/bin/ls"," ")
        os.execl("/bin/ls", "ls", "-al")
        # os.execlp("ls", "ls", "-al")  # 執行Path環境變量可以搜索到的命令
        print("exec函數族會替換代碼,我是不會被執行的,除非上面的出問題了")

    print("-" * 10)  # 父進程執行一次,子進程不會執行

if __name__ == '__main__':
    main()

注意輸出信息:os.execlp("ls", "ls", "-al")

----------
總用量 28
drwxrwxr-x 6 dnt dnt 4096 7月  26 05:23 .
drwxrwxr-x 9 dnt dnt 4096 7月  24 20:55 ..
drwxr-xr-x 2 dnt dnt 4096 7月  19 14:47 .ipynb_checkpoints
drwxrwxr-x 6 dnt dnt 4096 7月  26 06:27 Linux
-rw-rw-r-- 1 dnt dnt   93 7月  26 05:49 temp.py
drwxrwxr-x 2 dnt dnt 4096 7月  24 15:29 .vscode
drwxrwxr-x 2 dnt dnt 4096 7月  25 12:18 進程
 

2.4.1.進程間通信~文件通信

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程通信/1.file

講管道之前,先說個簡單的:通過文件進行通信

來一個簡單讀寫的案例先適應下文件操作:

In [1]:
!ls

# 這種寫法類似於Net的 using 托管
with open("test.txt", "w") as f:
    f.write("從前有座山,山上有座廟,廟里有個老和尚和一個小和尚。有一天,老和尚對小和尚說:")

with open("test.txt", "r") as f:
    data = f.read()
    print(data)

!ls
 
並發編程~進程先導篇.ipynb
從前有座山,山上有座廟,廟里有個老和尚和一個小和尚。有一天,老和尚對小和尚說:
test.txt  並發編程~進程先導篇.ipynb
 

來個簡單的案例:

import os
import time

def main():
    pid = os.fork()

    if pid > 0:
        print("父進程(pid=%d)開始寫入:" % os.getpid())
        with open(str(pid), "w") as f:
            f.write("[父進程寫入]從前有座山,山上有座廟,廟里有個老和尚和一個小和尚。有一天,老和尚對小和尚說:\n")
        time.sleep(2)
        print("父進程(pid=%d)開始讀取:" % os.getpid())
        with open(str(pid), "r") as f:
            print(f.read())

        wpid, status = os.wait()  # 收屍
        print("pid=%d已經回收,status:%d" % (wpid, status))

    elif pid == 0:
        print("子進程(pid=%d)開始讀取:" % os.getpid())
        with open(str(os.getpid()), "r") as f:
            print(f.read())
        print("子進程(pid=%d)開始追加:" % os.getpid())
        with open(str(os.getpid()), "a") as f:  # 追加
            f.write("[子進程追加]從前有座山,山上有座廟,廟里有個老和尚和一個小和尚。有一天,老和尚對小和尚說:\n")

    print("\n進程(pid=%d)完蛋了" % os.getpid())

if __name__ == '__main__':
    main()

圖示: 文件通信

 

2.4.2.進程間通信~隊列 Queue(常用)

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程通信/2.Queue

from multiprocessing import Queue

help(Queue)

Help on method Queue in module multiprocessing.context:

Queue(maxsize=0) method of multiprocessing.context.DefaultContext instance
    Returns a queue object

實例化對象幫助文檔:

from multiprocessing import Queue

q = Queue(2)
help(q)

Help on Queue in module multiprocessing.queues object:

class Queue(builtins.object)
 |  Methods defined here:
 |  
 |  __getstate__(self)
 |  
 |  __init__(self, maxsize=0, *, ctx)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  __setstate__(self, state)
 |  
 |  cancel_join_thread(self)
 |  
 |  close(self)
 |  
 |  empty(self)
 |  
 |  full(self)
 |  
 |  get(self, block=True, timeout=None)
 |  
 |  get_nowait(self)
 |  
 |  join_thread(self)
 |  
 |  put(self, obj, block=True, timeout=None)
 |  
 |  put_nowait(self, obj)
 |  
 |  qsize(self)
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)

詳細內容(如:非阻塞、池中應用等)下次講代碼的時候會詳說,簡單看個例子:

import os
from multiprocessing import Queue

def main():
    q = Queue(1)  # 創建一個容量為1的隊列(只能put接受1條,get取出后才可以放)
    pid = os.fork()
    if pid == 0:
        print("[子進程]:pid:%d,ppid:%d" % (os.getpid(), os.getppid()))
        q.put("父親大人,我可以出去玩嗎?")
        output = q.get()
        print("[子進程]收到父親大人回復:%s" % output)
    elif pid > 0:
        print("[父進程]:pid:%d,ppid:%d" % (os.getppid(), os.getppid()))
        output = q.get()  # 兒子每天出去都會說,等待ing
        print("[父進程]收到兒子的話:%s" % output)
        q.put("准了")

        wpid, status = os.wait()
        print("[父進程]幫pid:%d收屍,狀態:%d" % (wpid, status))

    print("[OVER]:pid:%d,ppid:%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

輸出:

[父進程]:pid:12403,ppid:12403
[子進程]:pid:744,ppid:743
[父進程]收到兒子的話:父親大人,我可以出去玩嗎?
[子進程]收到父親大人回復:准了
[OVER]:pid:744,ppid:743
[父進程]幫pid:744收屍,狀態:0
[OVER]:pid:743,ppid:12403

2.4.3.進程間通信~PIPE匿名管道(常用)

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程通信/3.pipe

知識普及:

  1. 命令模式下默認有六個終端tty1-tty6
  2. tty7代表圖形登錄
  3. 遠程登錄會顯示pts/0,1,2...

如果終端的概念還不清楚可以看之前的文章:https://www.cnblogs.com/dunitian/p/6658273.html

help(os.pipe)

Help on built-in function pipe in module posix:

pipe()
    Create a pipe.

    Returns a tuple of two file descriptors:
      (read_fd, write_fd)

匿名管道的模式其實我們平時都在用,只是不知道而已,比如:ps aux | grep "python" 這個 | 就是匿名管道

本質:內核的緩沖區,不占用磁盤空間(可以看成偽文件)【默認4k,相差不大的情況下系統會自動微調大小】

我們來看一下:ulimit -a

Ubuntu 18.04

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 14894
max locked memory       (kbytes, -l) 16384
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 14894
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

CentOS 7.5

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 3543
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 3543
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

Ubuntu2 CentOS2

原理:算法實現的環形隊列(隊列:先進先出)

特點

  1. 操作管道的進程被銷毀后,管道自動被釋放
  2. 管道默認是阻塞(讀、寫兩端都阻塞)
  3. 管道有兩個端,一個是讀端(read_fd,一般都為3),一個是寫端(write_fd,一般都為4)
  4. 單向傳輸

4的意思是這樣的(網上找個圖,然后改造一下) 1.網絡

驗證一下3: 4.fdTuple

為什么是3開始呢?查看一下源碼:(https://github.com/python/cpython/blob/v3.7.0/Lib/pty.py

STDIN_FILENO = 0  # 看這:文件描述符輸入(讀端)
STDOUT_FILENO = 1 # 看這:文件描述符輸出(寫端)
STDERR_FILENO = 2 # 已經被占用了0~2了,自然從3開始

# 下面的不用你會,上面Code看完,我們的目的就達到了,下面看看即可
def fork():
    """fork() -> (pid, master_fd) Fork分叉后讓子進程成為控制終端的會話領導者"""
    try:
        pid, fd = os.forkpty() # 設置會話領導
    except (AttributeError, OSError):
        pass
    else: # 沒有錯誤執行
        if pid == CHILD:
            os.setsid()
        return pid, fd

    master_fd, slave_fd = openpty()
    pid = os.fork()
    if pid == CHILD:
        # 建立一個新的會話
        os.setsid()
        os.close(master_fd)

        # 把子進程里面的 slave_fd 重定向到 stdin/stdout/stderr
        os.dup2(slave_fd, STDIN_FILENO)
        os.dup2(slave_fd, STDOUT_FILENO)
        os.dup2(slave_fd, STDERR_FILENO)
        if (slave_fd > STDERR_FILENO):
            os.close (slave_fd)

        # 顯式打開tty,使它成為一個tty控制
        tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
        os.close(tmp_fd)
    else:
        os.close(slave_fd)

    # Parent and child process.
    return pid, master_fd

畫個大綱圖理解一下:(讀的時候關閉寫,寫的時候關閉讀) 父子進程圖示

結合單向傳輸理解一下:(父子只能一個人寫,另一個人只能讀) PIPE 簡單概況上圖:子進程只讀,父進程只寫 or 子進程只寫,父進程只讀 (如果想要相互讀寫通信~兩根管道走起)

簡單分析一下 ps aux | grep python ,本來ps aux是准備在終端中輸出的,現在寫入內核緩沖區了,grep從內核緩沖區里面讀取,把符合條件的輸出到終端

終端文件描述獲取:

import sys

sys.stdin.fileno() # STDIN_FILENO = 0:文件描述符輸入(讀端)
sys.stdout.fileno() # STDOUT_FILENO = 1:看這:文件描述符輸出(寫端)

我們用程序實現一個同樣效果的:(grep有顏色,其實就是加了--color=auto)

import os
import sys

def main():
    # 創建內核緩存區(偽文件)
    read_fd, write_fd = os.pipe()
    print("read_fd:%s\nwrite_fd:%s" % (read_fd, write_fd))

    pid = os.fork()
    if pid > 0:
        print("[父進程]pid=%d,ppid=%d" % (os.getpid(), os.getppid()))

        # 寫或者讀,則需要關閉另一端(防止自己寫自己讀)
        os.close(read_fd)
        # dup2(oldfd,newfd) 把寫端數據重定向到文件描述符輸出端
        os.dup2(write_fd, sys.stdout.fileno())  # STDOUT_FILENO==1 (文件描述符輸出,寫端)
        # 僵桃李代
        os.execlp("ps", "ps", "aux")

    elif pid == 0:
        print("[子進程]pid=%d,ppid=%d" % (os.getpid(), os.getppid()))

        # 子進程現在需要讀,關閉寫段
        os.close(write_fd)
        # dup2(oldfd,newfd) 把讀端數據重定向到文件描述符輸入端
        os.dup2(read_fd, sys.stdin.fileno())  # STDOUT_FILENO == 0 (文件描述符輸入,讀端)
        # 僵桃李代 (默認是從終端讀,重定向后從內核緩沖區讀)
        os.execlp("grep", "grep", "python", "--color=auto")

if __name__ == '__main__':
    main()

輸出:(用到的函數:os.pipe() and os.dup2(oldfd,newfd)匿名管道

PS:在C系列里面如果你該關閉的fd沒關,會資源浪費,python好像做了處理,沒能夠問題復現,所以還是建議父子一方只讀,一方只寫


概念再理解:fork了兩個子進程,則文件描述符被復制了2份,大家文件描述符的3、4都指向了pipe管道read_fdwrite_fd 子進程通信1]

來張圖理解一下,哪些fd被close了(如果讓子進程之間通信,父進程因為不讀不寫,所以讀寫都得關閉) 子進程通信2 代碼演示:(這次注釋很全)

import os
import sys
import time

def main():

    read_fd, write_fd = os.pipe()  # 可以思考為啥在上面創建管道(提示.text代碼段都一樣)

    i = 0
    while i < 2:
        pid = os.fork()
        # 防止子進程生猴子
        if pid == 0:
            break
        i += 1

    # 子進程1
    if i == 0:
        print("[子進程%d]pid=%d,ppid=%d" % (i, os.getpid(), os.getppid()))

        # 准備重定向到寫端,所以先關了讀端
        os.close(read_fd)
        os.dup2(write_fd, sys.stdout.fileno())  # STDOUT_FILENO == 1 (文件描述符輸出,寫端)

        # 僵桃李代
        os.execlp("ps", "ps", "-aux")
        # 僵桃李代后,.text代碼段都被替換了,自然不會執行
        print("我是不會執行的,不信你看唄")
    elif i == 1:
        print("[子進程%d]pid=%d,ppid=%d" % (i, os.getpid(), os.getppid()))

        # 准備重定向到讀端,所以先關了寫端
        os.close(write_fd)
        os.dup2(read_fd, sys.stdin.fileno())  # STDIN_FILENO == 0 (文件描述符輸入,讀端)

        # 僵桃李代  ”bash“是查找關鍵詞,你寫你想找的字符串即可
        os.execlp("grep", "grep", "bash", "--color=auto")

        # 僵桃李代后,.text代碼段都被替換了,自然不會執行
        print("我是不會執行的,不信你看唄")
    elif i == 2:
        print("[父進程]pid=%d,ppid=%d" % (os.getpid(), os.getppid()))

        # 我不寫不讀
        os.close(read_fd)
        os.close(write_fd)

        # 為了大家熟練掌握wait系列,這次用waitpid
        while True:
            info = ()
            try:
                info = os.waitpid(-1, os.WNOHANG)  # 非阻塞的方式回收所有子進程
            except OSError:
                break  # waitpid返回-1的時候,Python會拋出異常

            if info[0] > 0:
                print("父進程收屍成功:pid=%d,ppid=%d,狀態status:%d" %
                      (os.getpid(), os.getppid(), info[1]))
            print("父進程做其他事情...")
            time.sleep(0.005)  # 休息 0.005s

    print("[父進程-遺言]pid=%d,ppid=%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

結果:

[父進程]pid=18678,ppid=27202
[子進程0]pid=18679,ppid=18678
[子進程1]pid=18680,ppid=18678
父進程做其他事情...
父進程做其他事情...
父進程做其他事情...
父進程做其他事情...
dnt       4622  0.0  0.1  24880  5688 pts/2    Ss   05:28   0:00 bash
父進程做其他事情...
dnt      15419  0.0  0.1  25152  5884 pts/0    Ss+  06:29   0:00 /bin/bash
dnt      18680  0.0  0.0  16184  1044 pts/4    S+   13:25   0:00 grep bash --color=auto
dnt      27202  0.0  0.1  25012  6052 pts/4    Ss   08:25   0:00 bash
父進程收屍成功:pid=18678,ppid=27202,狀態status:0
父進程做其他事情...
父進程收屍成功:pid=18678,ppid=27202,狀態status:0
父進程做其他事情...
[父進程-遺言]pid=18678,ppid=27202
In [1]:
# 說管道讀寫之前,先復習個知識點:
bts = "尷尬".encode()
b_str = bts.decode()
print(bts)
print(b_str)
 
b'\xe5\xb0\xb4\xe5\xb0\xac'
尷尬
 

匿名管道讀寫操作

上面知識點忘了可以復習一下:https://www.cnblogs.com/dotnetcrazy/p/9278573.html#1.2.字符串和編碼

用到的函數:(這個就不需要使用dup2來重定向到終端了【有血緣關系的進程之間通信,並不依賴於終端顯示】)

os.write(fd, str)寫入字符串到文件描述符 fd中. 返回實際寫入的字符串長度

os.read(fd, n)從文件描述符 fd 中讀取最多 n 個字節,返回包含讀取字節的字符串

如果文件描述符fd對應文件已達到結尾, 返回一個空字符串

舉個父子間通信的例子(比C系列簡單太多)【下次講的通用Code會更簡單】

import os

def close_fd(*fd_tuple_args):
    """關閉fd,fd_tuple_args是可變參數"""
    for item in fd_tuple_args:
        os.close(item[0])
        os.close(item[1])

def main():
    # 管道是單向的,相互讀寫,那就創建兩個管道
    fd_tuple1 = os.pipe()  # 進程1寫,進程2讀
    fd_tuple2 = os.pipe()  # 進程2寫,進程1讀

    i = 0
    while i < 2:
        pid = os.fork()
        if pid == 0:
            break
        i += 1
    # 子進程1
    if i == 0:
        print("[子進程]pid:%d,ppid:%d" % (os.getpid(), os.getppid()))

        os.close(fd_tuple1[0])  # 進程1寫,則關閉下讀端
        msg_str = "進程1說:兄弟,今天擼串嗎?"
        os.write(fd_tuple1[1], msg_str.encode())  # 把字符串xxx轉換成bytes

        # 不讀的我關閉掉:
        os.close(fd_tuple2[1])  # 進程2寫,我不需要寫,關閉寫端
        bts = os.read(fd_tuple2[0], 1024)
        print("[子進程1]", bts.decode())

        exit(0)  # 退出后就不執行下面代碼塊語句了
    # 子進程2
    elif i == 1:
        print("[子進程2]pid:%d,ppid:%d" % (os.getpid(), os.getppid()))

        os.close(fd_tuple1[1])  # 進程2讀,則關閉下寫端
        bts = os.read(fd_tuple1[0], 1024)
        print("[子進程2]", bts.decode())

        # 不讀的我關閉掉:
        os.close(fd_tuple2[0])  # 進程2寫,關閉讀端
        msg_str = "進程2說:可以可以~"
        os.write(fd_tuple2[1], msg_str.encode())  # 把字符串xxx轉換成bytes

        exit()  # 不加參數默認是None
    # 父進程
    elif i == 2:
        print("[父進程]pid:%d,ppid:%d" % (os.getpid(), os.getppid()))

        # 父進程不讀不寫,就看看
        close_fd(fd_tuple1, fd_tuple2)

        # 收屍ing
        while True:
            try:
                wpid, status = os.wait()
                print("[父進程~收屍]子進程PID:%d 的狀態status:%d" % (wpid, status))
            except OSError:
                break
    # 子進程都exit()退出了,不會執行這句話了
    print("[父進程遺言]pid:%d,ppid:%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

輸出結果:

[父進程]pid:12002,ppid:27202
[子進程2]pid:12004,ppid:12002
[子進程]pid:12003,ppid:12002
[子進程2] 進程1說:兄弟,今天擼串嗎?
[子進程1] 進程2說:可以可以~
[父進程~收屍]子進程PID:12003 的狀態status:0
[父進程~收屍]子進程PID:12004 的狀態status:0
[父進程遺言]pid:12002,ppid:27202

非阻塞管道(簡寫法)

隊列的getput方法默認也是阻塞的,如果想非阻塞可以調用get_nowaitput_nowait來變成非阻塞,那pipe管道呢?

C系列一般使用fcntl來實現,Python進行了封裝,我們可以通過os.pipe2(os.O_NONBLOCK)來設置非阻塞管道

help(os.pipe2)

Help on built-in function pipe2 in module posix:

pipe2(flags, /)
    Create a pipe with flags set atomically.

    Returns a tuple of two file descriptors:
      (read_fd, write_fd)

    flags can be constructed by ORing together one or more of these values:
    O_NONBLOCK, O_CLOEXEC.

舉個例子:

import os
import time

def main():
    r_fd, w_fd = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)

    pid = os.fork()
    if pid == 0:
        print("子進程:pid=%d,ppid=%d" % (os.getpid(), os.getppid()))
        time.sleep(0.5)

        # 和父進程進行通信
        os.close(r_fd)
        os.write(w_fd, "老爸,我出去玩了~".encode())

        exit(0)  # 子進程退出
    elif pid > 0:
        print("父進程:pid=%d,ppid=%d" % (os.getpid(), os.getppid()))

        # 讀兒子的留言
        os.close(w_fd)
        b_msg = b""
        while True:
            try:
                b_msg = os.read(r_fd, 1)  # 沒有數據就出錯(一般都是等待一會,也可以和信號聯合使用)
            except OSError:
                print("兒子怎么沒有留言呢?")

            print("父進程:做其他事情...")
            if len(b_msg) > 0:
                break
            time.sleep(0.1)

        # 繼續讀剩下的消息
        b_msg += os.read(r_fd, 1024)
        print("兒子留言:", b_msg.decode())

        wpid, status = os.wait()
        print("幫兒子做掃尾工作:pid=%d,status=%d" % (wpid, status))

    print("父進程遺言:pid=%d,status=%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

輸出:

父進程:pid=31430,ppid=27202
子進程:pid=31431,ppid=31430
兒子怎么沒有留言呢?
父進程:做其他事情...
兒子怎么沒有留言呢?
父進程:做其他事情...
兒子怎么沒有留言呢?
父進程:做其他事情...
兒子怎么沒有留言呢?
父進程:做其他事情...
兒子怎么沒有留言呢?
父進程:做其他事情...
父進程:做其他事情...
兒子留言: 老爸,我出去玩了~
幫兒子做掃尾工作:pid=31431,status=0
父進程遺言:pid=31430,status=27202

擴展:

  1. 數據只能讀1次(隊列和棧都這樣)
  2. 匿名管道必須有血緣關系的進程才能通信
  3. 半雙工通信:同一時刻里,信息只能有一個傳輸方向(類似於對講機)
 

2.4.4.進程間通信~FIFO有名管道

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程通信/4.fifo

FIFO管道

  1. 有名管道,除了血緣關系進程通信,沒有血緣關系的進程也可以通信
  2. 在磁盤上有會存放一個文件類型為p,大小為0的管道文件(偽文件,大小始終為0)
  3. 內核中有一個對應的緩沖區(數據就放在里面)
  4. 半雙工通信:同一時刻里,信息只能有一個傳輸方向(類似於對講機)
  5. fifo要求讀寫雙方必須同時打開才可以繼續進行讀寫操作,否則打開操作會堵塞直到對方也打開
  6. 如果讀端全部關閉,管道破裂,進程自動被終止(PIPE也是這樣的)

對2的驗證: 其實你用ll來查看,就是文件類型為p的文件(大小始終為0) 5.mkfifo

Linux底層提供了mkfifo函數,Python創建使用os.mkfifo()

畫個圖來看3: 5.fifo圖示

知識普及

help(os.open)

Help on built-in function open in module posix:

open(path, flags, mode=511, *, dir_fd=None)
    Open a file for low level IO.  Returns a file descriptor (integer).

    If dir_fd is not None, it should be a file descriptor open to a directory,
      and path should be relative; path will then be relative to that directory.
    dir_fd may not be implemented on your platform.
      If it is unavailable, using it will raise a NotImplementedError.

flags -- 該參數可以是以下選項,多個使用 | 隔開:

  • os.O_RDONLY: 以只讀的方式打開
  • os.O_WRONLY: 以只寫的方式打開
  • os.O_RDWR : 以讀寫的方式打開
  • os.O_NONBLOCK: 打開時不阻塞
  • os.O_APPEND: 以追加的方式打開
  • os.O_CREAT: 創建並打開一個新文件
  • os.O_TRUNC: 打開一個文件並截斷它的長度為零(必須有寫權限)
  • os.O_EXCL: 如果指定的文件存在,返回錯誤
  • os.O_SHLOCK: 自動獲取共享鎖
  • os.O_EXLOCK: 自動獲取獨立鎖
  • os.O_DIRECT: 消除或減少緩存效果
  • os.O_FSYNC : 同步寫入
  • os.O_NOFOLLOW: 不追蹤軟鏈接

很多人直接使用了Open方法open(fifo_path, "r")open(fifo_path, "w")貌似也是可以的,但是不推薦

我們使用官方推薦的方法

無血緣關系通信

fifo操作非常簡單,和文件IO操作幾乎一樣,看個無血緣關系進程通信的例子:

進程1源碼:r_fifo.py

import os

def main():
    file_name = "fifo_file"
    if not os.path.exists(file_name):
        os.mkfifo(file_name)

    fd = os.open(file_name, os.O_RDONLY)  # 只讀(阻塞)
    while True:
        b_msg = os.read(fd, 1024)
        if len(b_msg) > 0:
            print(b_msg.decode())

if __name__ == '__main__':
    main()

進程2源碼:w_fifo.py

import os
import time

def main():
    file_name = "fifo_file"
    if not os.path.exists(file_name):
        os.mkfifo(file_name)

    fd = os.open(file_name, os.O_WRONLY)  # 只寫
    while True:
        time.sleep(1)  # 模擬一下實際生產環境下的 讀快寫慢
        try:
            os.write(fd, "我是說話有魔性,喝水會長胖的小明同學".encode())  # 寫入bytes
        except BrokenPipeError:
            print("如果讀端全部關閉,管道破裂,進程自動被終止")
            break

if __name__ == '__main__':
    main()

做個讀端的測試: fifo小明

讀寫雙測:(fifo文件大小始終為0,只是偽文件而已) fifo小明2

擴展一下,如果你通過終端讀寫呢?(同上) fifotest

再來個讀寫的案例

3.rw_fifo1.py

import os

def main():
    file_name = "fifo_temp"
    if not os.path.exists(file_name):
        os.mkfifo(file_name)

    fd = os.open(file_name, os.O_RDWR)  # 你輸入os.O_rw就會有這個選項了,不用硬記
    msg = os.read(fd, 1024).decode()  # 阻塞的方式,不用擔心
    print("[進程2]%s" % msg)
    os.write(fd, "小明啊,你忘記你長幾斤肉了?".encode())

if __name__ == '__main__':
    main()

rw_fifo2.py

import os
import time

def main():
    file_name = "fifo_temp"
    if not os.path.exists(file_name):
        os.mkfifo(file_name)

    fd = os.open(file_name, os.O_RDWR)  # 你輸入os.O_rw就會有這個選項了,不用硬記
    os.write(fd, "小潘,擼串去不?".encode())

    time.sleep(3)  # 防止自己寫的被自己讀了

    msg = os.read(fd, 1024).decode()  # 阻塞的方式,不用擔心
    print("[進程1]]%s" % msg)

if __name__ == '__main__':
    main()

5.fiforw

有血緣關系通信

來個父子間通信:(代碼比較簡單,和上面差不多,看看即可)

import os

def main():
    file_name = "fifo_test"
    if not os.path.exists(file_name):
        os.mkfifo(file_name)

    fd = os.open(file_name, os.O_RDWR)  # 讀寫方式打開文件描述符 (O_RDONLY | O_WRONLY)

    pid = os.fork()
    if pid == 0:
        print("子進程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))

        os.write(fd, "子進程說:老爸,我想出去玩".encode())  # 寫
        msg = os.read(fd, 1024).decode()  # 讀
        print("[子進程]%s" % msg)
    elif pid > 0:
        print("父進程:PID:%d,PPID:%d" % (os.getpid(), os.getppid()))

        msg = os.read(fd, 1024).decode()  # 阻塞方式,不用擔心
        print("[父進程]%s" % msg)
        os.write(fd, "父進程說:去吧乖兒子".encode())

        # 給子進程收屍
        wpid, status = os.wait()
        print("父進程收屍:子進程PID=%d,PPID=%d" % (wpid, status))

    print("進程遺言:PID=%d,PPID=%d" % (os.getpid(), os.getppid()))  # 剩下的代碼段

if __name__ == '__main__':
    main()

輸出:

父進程:PID:21498,PPID:20943
子進程:PID:21499,PPID:21498
[父進程]子進程說:老爸,我想出去玩
[子進程]父進程說:去吧乖兒子
進程遺言:PID=21499,PPID=21498
父進程收屍:子進程PID=21499,PPID=0
進程遺言:PID=21498,PPID=20943
 

2.4.5.進程間通信~MMAP內存映射(常用)

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程通信/5.mmap

好處:內存操作,比IO快

缺點:和文件一樣不會像管道一樣阻塞(讀的可能不全,需要自己考慮讀寫效率)

畫個簡單的圖示: 1.mmap圖示 PS:內存映射一個文件並不會導致整個文件被讀取到內存中:

  1. 文件並沒有被復制到內存緩存中,操作系統僅僅為文件內容保留了一段虛擬內存。
  2. 當你訪問文件的不同區域時,這些區域的內容才根據需要被讀取並映射到內存區域中。
  3. 沒有訪問的部分還是留在磁盤上

以Linux為例,簡單解析一下幫助文檔:(加粗的是必填參數)

mmap.mmapfileno,length[,flags=MAP_SHARED][,prot=PROT_WRITE|PROT_READ][,access=ACCESS_DEFAULT][,offset]
  1. fileno:就是我們經常說的文件描述fd
    1. 可以通過os.open()直接打開fd
    2. 也可以調用文件的f.fileno()
  2. length:映射區大小,(一般寫0就OK了)
    1. 如果length為0,則映射的最大長度將是調用時文件的當前大小
    2. 一般把文件大小 os.path.getsize(path)傳進去就可以了
  3. flags:映射區性質,默認是用共享
    1. MAP_SHARED 共享(數據會自動同步磁盤)
    2. MAP_PRIVATE 私有(不同步磁盤)
  4. prot:映射區權限,如果指定,就會提供內存保護(默認即可)
    1. PROT_READ 讀
    2. PROT_READ | PROT_WRITE 寫(必須有讀的權限)
  5. access:可以指定訪問來代替flags和prot作為可選的關鍵字參數【這個是Python為了簡化而添加的】
    1. ACCESS_READ:只讀
    2. ACCESS_WRITE:讀和寫(會影響內存和文件)
    3. ACCESS_COPY:寫時復制內存(影響內存,但不會更新基礎文件)
    4. ACCESS_DEFAULT:延遲到prot(3.7才添加)
  6. offset,偏移量,默認是0(和文件一致)
In [1]:
# 這個夠明了了,\0轉換成二進制就是\x00
"\0".encode()
Out[1]:
b'\x00'
In [2]:
# 老規矩,開始之前,擴充一個小知識點:(空字符串和'\0'區別)
a = "" # 空字符串 (Python里面沒有char類型)
b = "\x00" # `\0` 的二進制寫法
c = "\0"

print(a)
print(b)
print(c)

print(len(a))
print(len(b))
print(len(c))
 
0
1
1
 

看個簡單的案例快速熟悉mmap模塊:(大文件處理這塊先不說,以后要是有機會講數據分析的時候會再提)

m.size()  # 查看文件大小
m.seek(0)  # 修改Postion位置
m.tell()  # 返回 m 對應文件的Postion位置
m.read().translate(None, b"\x00")  # 讀取所有內容並把\0刪除
m.closed  # 查看mmap是否關閉

# 支持切片操作
m[0:10] # 取值
m[0:10] = b"1234567890"  # 賦值

# 對自行模式大文件處理的同志,給個提示
m.readline().decode() # 讀一行,並轉換成str
m.size()==m.tell() # while循環退出條件

熟悉一下上面幾個方法:

import os
import mmap

def create_file(filename, size):
    """初始化一個文件,並把文件擴充到指定大小"""
    with open(filename, "wb") as f:
        f.seek(size - 1)  # 改變流的位置
        f.write(b"\x00")  # 在末尾寫個`\0`

def main():
    create_file("mmap_file", 4096)  # 創建一個4k的文件
    with mmap.mmap(os.open("mmap_file", os.O_RDWR), 0) as m:  # 創建映射
        print(m.size())  # 查看文件大小
        m.resize(1024)  # 重新設置文件大小
        print(len(m))  # len也一樣查看文件大小
        print(m.read().translate(None, b"\x00"))  # 讀取所有內容並把\0刪除
        print(m.readline().decode())  # 讀取一行,bytes轉成str
        print(m.tell())  # 返回 m 對應文件的當前位置
        m.seek(0)  # 修改Postion位置
        print(m.tell())  # 返回 m 對應文件的當前位置
        print(m[0:10])  # 支持切片操作
        print("postion_index:%d" % m.tell())
        m[0:10] = b"1234567890"  # 賦值
        print("postion_index:%d" % m.tell())
        print(m[0:10])  # 取值
        print("postion_index:%d" % m.tell())
        print(m[:].decode())  # 全部讀出來
    print(m.closed)  # 查看mmap是否關閉

if __name__ == '__main__':
    main()

輸出:(測試了一下,切片操作【讀、寫】不會影響postion)

4096
1024
b''

1024
0
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
postion_index:0
postion_index:0
b'1234567890'
postion_index:0
1234567890
True

看下open打開的案例:

import os
import mmap

def main():
    with open("temp", "wb") as f:
        f.write("小明同學最愛刷碗\n小潘同學最愛打掃".encode())

    # 打開磁盤二進制文件進行更新(讀寫)
    with open("temp", "r+b") as f:
        with mmap.mmap(f.fileno(), 0) as m:
            print("postion_index:%d" % m.tell())
            print(m.readline().decode().strip())  # 轉成str並去除兩端空格
            print("postion_index:%d" % m.tell())
            print(m[:].decode())  # 全部讀出來
            print("postion_index:%d" % m.tell())
            m.seek(0)
            print("postion_index:%d" % m.tell())

if __name__ == '__main__':
    main()

輸出:

postion_index:0
小明同學最愛刷碗
postion_index:25
小明同學最愛刷碗
小潘同學最愛打掃
postion_index:25
postion_index:0

其他方法可以參考:這篇文章(Python3很多都和Python2不太相同,辯證去看吧)

注意一點:

通過MMap內存映射之后,進程間通信並不是對文件操作,而是在內存中。文件保持同步只是因為mmap的flags默認設置的是共享模式(MAP_SHARED)

PS:還記得之前講類方法和實例方法的時候嗎?Python中類方法可以直接被對象便捷調用,這邊mmap實例對象中的方法,其實很多都是類方法 步入正軌

來看一個有血緣關系的通信案例:(一般用匿名)

import os
import time
import mmap

def create_file(file_name, size):
    with open(file_name, "wb") as f:
        f.seek(size - 1)
        f.write(b"\0x00")

def main():
    file_name = "temp.bin"
    # mmap映射的時候不能映射空文件,所以我們自己創建一個
    create_file(file_name, 1024)

    fd = os.open(file_name, os.O_RDWR)
    with mmap.mmap(fd, 0) as m:  # m.resize(1024) # 大小可以自己調整的
        pid = os.fork()
        if pid == 0:
            print("[子進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
            m.write("子進程說:老爸,我想出去玩了~\n".encode())
            time.sleep(3)
            print(m.readline().decode().strip())
            exit(0)
        elif pid > 0:
            print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
            time.sleep(1)  # 和文件一樣,非堵塞
            print(m.readline().decode().strip())
            m.write("父進程說:去吧去吧\n".encode())
            wpid, status = os.wait()
            print("[父進程]收屍:PID:%d,Status:%d" % (wpid, status))
            exit(0)

if __name__ == '__main__':
    main()

輸出:

[父進程]PID:6843,PPID:3274
[子進程]PID:6844,PPID:6843
子進程說:老爸,我想出去玩了~
父進程說:去吧去吧
[父進程]收屍:PID:6844,Status:0

有血緣關系使用MMAP通信

父進程創建了一份mmap對象,fork產生子進程的時候相當於copy了一份指向,所以可以進行直接通信(聯想fd的copy)

import os
import time
import mmap

def main():
    # 不記錄文件中,直接內存中讀寫(這個地方len就不能為0了,自己指定一個大小eg:4k)
    with mmap.mmap(-1, 4096) as m:
        pid = os.fork()
        if pid == 0:
            print("[子進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
            m.write("[子進程]老爸我出去嗨了~\n".encode())
            time.sleep(2)
            msg = m.readline().decode().strip()
            print(msg)
            exit(0)
        elif pid > 0:
            print("[父進程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
            time.sleep(1)
            msg = m.readline().decode().strip()
            print(msg)
            m.write("[父進程]去吧,皮卡丘~".encode())

            wpid, status = os.wait()
            print("[父進程]收屍:PID:%d,Status:%d" % (wpid, status))
            exit(0)

if __name__ == '__main__':
    main()

輸出:

[父進程]PID:8115,PPID:3274
[子進程]PID:8116,PPID:8115
[子進程]老爸我出去嗨了~
[父進程]去吧,皮卡丘~
[父進程]收屍:PID:8116,Status:0

無血緣關系使用MMAP通信

因為不同進程之前沒有關聯,必須以文件為媒介(文件描述符fd)

進程1:

import os
import time
import mmap

def create_file(file_name, size):
    with open(file_name, "wb") as f:
        f.seek(size - 1)
        f.write(b"\0x00")

def main():
    file_name = "temp.bin"

    if not os.path.exists(file_name):
        # mmap映射的時候不能映射空文件,所以我們自己創建一個
        create_file(file_name, 1024)

    fd = os.open(file_name, os.O_RDWR)
    with mmap.mmap(fd, 0) as m:  # m.resize(1024) # 大小可以自己調整的
        print("[進程1]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
        m.write("進程1說:小明放學去擼串嗎?\n".encode())
        time.sleep(3)
        print(m.readline().decode().strip())
        exit(0)

if __name__ == '__main__':
    main()

進程2:

import os
import time
import mmap

def create_file(file_name, size):
    with open(file_name, "wb") as f:
        f.seek(size - 1)
        f.write(b"\0x00")

def main():
    file_name = "temp.bin"

    if not os.path.exists(file_name):
        # mmap映射的時候不能映射空文件,所以我們自己創建一個
        create_file(file_name, 1024)

    fd = os.open(file_name, os.O_RDWR)
    with mmap.mmap(fd, 0) as m:  # m.resize(1024) # 大小可以自己調整的
        print("[進程2]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
        time.sleep(1)
        print(m.readline().decode().strip())
        m.write("進程2說:為毛不去?\n".encode())
        exit(0)

if __name__ == '__main__':
    main()

輸出圖示: 2.mmap無血緣關系進程通信

 

2.4.6.進程間通信~Signal信號

代碼實例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程通信/6.signal

信號:它是一種異步的通知機制,用來提醒進程一個事件已經發生。當一個信號發送給一個進程,操作系統中斷了進程正常的控制流程,此時,任何非原子操作都將被中斷。如果進程定義了信號的處理函數,那么它將被執行,否則就執行默認的處理函數。

一般信號不太用於進程間通信,常用就是發個信號把xxx進程干死。

先來個例子,等會講理論:

Python里面一般用os.kill(pid,signalnum)來發信號:eg:kill 9 pid

import os
import time
import signal

def main():
    pid = os.fork()
    if pid == 0:
        print("[子進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
        while True:
            print("[子進程]孩子老卵,怎么滴吧~")
            time.sleep(1)
    elif pid > 0:
        print("[父進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
        time.sleep(3)
        print("父進程耐心有限,准備殺了兒子")

        # sigkill 相當於kill 9 pid
        os.kill(pid, signal.SIGKILL)  # 發信號

        # 收屍
        wpid, status = os.wait()
        print("父進程收屍:子進程PID=%d,Status=%d" % (wpid, status))

if __name__ == '__main__':
    main()

輸出:

[父進程]PID=21841,PPID=5559
[子進程]PID=21842,PPID=21841
[子進程]孩子老卵,怎么滴吧~
[子進程]孩子老卵,怎么滴吧~
[子進程]孩子老卵,怎么滴吧~
父進程耐心有限,准備殺了兒子
父進程收屍:子進程PID=21842,Status=9

擴展一下:

  1. signal.pthread_kill(thread_id,signal.SIGKILL)) # 殺死線程
  2. os.abort() # 給自己發異常終止信號

理論開始

這邊開始說說理論:

信號狀態

  1. 產生狀態
  2. 未決狀態(信號產生后沒有被處理)
  3. 遞達狀態(信號已經傳達到進程中)

產生、傳遞等都是通過內核進行的,結合上面的例子畫個圖理解下: 3.signal圖示

未決信號集:沒有被當前進程處理的信號集合(可以通過signal.sigpending()獲取set集合)

阻塞信號集:要屏蔽的信號(不能被用戶操作)

回顧一下上面說kill 9 pid原理的知識:kill -l

 1) SIGHUP   2) SIGINT   3) SIGQUIT  4) SIGILL   5) SIGTRAP
 6) SIGABRT  7) SIGBUS   8) SIGFPE   9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
16) SIGSTKFLT   17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG  24) SIGXCPU 25) SIGXFSZ
26) SIGVTALRM   27) SIGPROF 28) SIGWINCH    29) SIGIO   30) SIGPWR
31) SIGSYS  34) SIGRTMIN    35) SIGRTMIN+1  36) SIGRTMIN+2  37) SIGRTMIN+3
38) SIGRTMIN+4  39) SIGRTMIN+5  40) SIGRTMIN+6  41) SIGRTMIN+7  42) SIGRTMIN+8
43) SIGRTMIN+9  44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9  56) SIGRTMAX-8  57) SIGRTMAX-7
58) SIGRTMAX-6  59) SIGRTMAX-5  60) SIGRTMAX-4  61) SIGRTMAX-3  62) SIGRTMAX-2
63) SIGRTMAX-1  64) SIGRTMAX

說下常用的幾個信號:

  1. 9號信號(sigkill)是kill 9
  2. 2號信號(sigint)是Ctrl+C終止進程
  3. 3號信號(sigquit)是Ctrl+\終止進程

信號捕捉

現在說說信號捕捉signal.signal(signalnum, handler)

handler處理函數,除了自定義信號處理函數外也可以使用系統提供的兩種方式:

  1. SIG_IGN(忽略該信號)
  2. SIG_DFL(系統默認操作)

注意一點:SIGSTOPSIGKILL 信號是不能被捕獲、忽略和阻塞的(這個是系統預留的,如果連預留都沒有可以想象肯定木馬橫向)

PS:信號的優先級一般都是比較高的,往往進程收到信號后都會停下手上的事情先處理信號(死循環也一樣歇菜)

來看一個例子:(處理singint,忽略sigquit)

import os
import time
import signal

def print_info(signalnum, frame):
    print("信號:%d准備弄我,我是小強我怕誰?(%s)" % (signalnum, frame))

def main():
    signal.signal(signal.SIGINT, print_info)  # 處理Ctrl+C的終止命令(singint)
    signal.signal(signal.SIGQUIT, signal.SIG_IGN)  # 忽略Ctrl+\的終止命令(sigquit)

    while True:
        print("[PID:%d]我很堅強,不退出,等着信號來遞達~" % os.getpid())
        time.sleep(3)  # 你要保證進程不會退出才能處理信號,不用擔心影響信號(優先級高)

if __name__ == '__main__':
    main()

輸出圖示:(我休息3s,在3s內給程序發送了sigint信號(Ctrl+C)就立馬處理了) signal信號捕捉

擴展:

  1. 如果你只是等一個信號就退出,可以使用:signal.pause(),不必使用死循環來輪詢了
  2. os.killpg(pgid, sid)進程組結束
  3. signal.siginterrupt(signal.SIGALRM, False) 防止系統調用被信號打斷所設立(其實一般也不太用,出問題才用)

通俗的講就是,要是系統和你發一樣的信號可能也就被處理了,加上這句就ok了,eg:

舉個例子,有時候有些惡意程序蓄意破壞或者被所謂的安全軟件誤殺比如系統函數kill(-1)【有權限的都殺了】

import signal

def print_info(signalnum, frame):
    print("死前留言:我被信號%d弄死了,記得替我報仇啊!" % signalnum)

def main():
    signal.signal(signal.SIGINT, print_info)  # 處理Ctrl+C的終止命令(singint)
    signal.signal(signal.SIGQUIT, print_info)  # 處理Ctrl+\的終止命令(singquit)
    signal.siginterrupt(signal.SIGINT, False)
    signal.siginterrupt(signal.SIGQUIT, False)
    signal.pause()  # 設置一個進程到休眠狀態直到接收一個信號

if __name__ == '__main__':
    main()

輸出:

dnt@MZY-PC:~/桌面/work/BaseCode/python/5.concurrent/Linux/進程通信/6.signal python3 1.os_kill2.py 
^C死前留言:我被信號2弄死了,記得替我報仇啊!
dnt@MZY-PC:~/桌面/work/BaseCode/python/5.concurrent/Linux/進程通信/6.signal python3 1.os_kill2.py 
^\死前留言:我被信號3弄死了,記得替我報仇啊!
dnt@MZY-PC:~/桌面/work/BaseCode/python/5.concurrent/Linux/進程通信/6.signal

定時器alarm(執行一次)

再說兩個定時器就進下一個話題把,這個主要就是信號捕捉用得比較多,然后就是一般都是守護進程發信號

先驗證一個概念:alarm鬧鍾不能被fork后的子進程繼承

import os
import time
import signal

def main():
    # 不受進程影響,每個進程只能有一個定時器,再設置只是重置
    signal.alarm(3)  # 設置終止時間(3s),然后終止進程(sigaltirm)

    pid = os.fork()
    if pid == 0:
        print("[子進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
        for i in range(5):
            print("[子進程]孩子老卵,怎么滴吧~")
            time.sleep(1)
    elif pid > 0:
        print("[父進程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))

    print("[遺言]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    main()

輸出

[父進程]PID=9687,PPID=9063
[遺言]PID=9687,PPID=9063
[子進程]PID=9688,PPID=9687
[子進程]孩子老卵,怎么滴吧~
[子進程]孩子老卵,怎么滴吧~
[子進程]孩子老卵,怎么滴吧~
[子進程]孩子老卵,怎么滴吧~
[子進程]孩子老卵,怎么滴吧~
[遺言]PID=9688,PPID=1060

這個你可以自己驗證:不受進程影響,每個進程只能有一個定時器,再設置只是重置

普及一個小技巧

其實好好看逆天的問題都會發現各種小技巧的,所有小技巧自我總結一下就會產生質變了

import signal

def main():
    signal.alarm(1)  # 設置終止時間(3s),然后終止進程(sigaltirm)
    i = 0
    while True:
        print(i)
        i += 1  # 別忘記,Python里面沒有++哦~

if __name__ == '__main__':
    main()

運行一下:time python3 xxx.py time1 運行一下:time python3 xxx.py > temp time2

簡單說下三個參數:

  1. real總共運行時間(real=user+sys+損耗時間)
  2. user(用戶代碼真正運行時間)
  3. sys(內核運行時間)【內核不運行,你系統也不正常了】

其實就是減少了IO操作,性能方面就相差幾倍!我這邊只是一台老電腦,要是真在服務器下性能相差可能讓你嚇一跳

現在知道為什么要realase發布而不用debug直接部署了吧(線上項目非必要情況,一般都會刪除所有日記輸出的

定時器setitimer(周期執行)

signal.setitimer(which, seconds, interval=0.0) which參數說明:

  1. signal.TIMER_REAL:按實際時間計時,計時到達將給進程發送SIGALRM信號
  2. signal.ITIMER_VIRTUAL:僅當進程執行時才進行計時。計時到達將發送SIGVTALRM信號給進程。
  3. signal.ITIMER_PROF:當進程執行時和系統為該進程執行動作時都計時。與ITIMER_VIRTUAL是一對,該定時器經常用來統計進程在用戶態和內核態花費的時間。計時到達將發送SIGPROF信號給進程。

這個一般在守護進程中經常用,看個簡單案例:

import time
import signal

def say_hai(signalnum, frame):
    print("我會周期性執行哦~")

def main():
    # 捕捉信號(在前面最好,不然容易漏捕獲)
    signal.signal(signal.SIGALRM, say_hai)
    # 設置定時器,第一次1s后執行,以后都3s執行一次
    signal.setitimer(signal.ITIMER_REAL, 1, 3)
    # print(signal.getitimer(signal.ITIMER_REAL))

    while True:
        print("我在做其他事情")
        time.sleep(1)

if __name__ == '__main__':
    main()

輸出:

我在做其他事情
我會周期性執行哦~
我在做其他事情
我在做其他事情
我在做其他事情
我會周期性執行哦~
我在做其他事情
我在做其他事情
我在做其他事情
我會周期性執行哦~
我在做其他事情
我在做其他事情
我在做其他事情
...
 

2.4.7.進程守護

實例代碼:"https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/進程守護

守護進程應用場景很多,比如程序上線后有個bug被不定時的觸發,每次都導致系統爆卡或者退出,而程序員修復bug需要時間,但是線上項目又不能掛,這時候就可以使用一個心跳檢測的守護進程(查錯也可以使用守護進程)【為惡就不說了】

正式開始前,先來個偽案例:

模擬一個漏洞百出的程序

import os
import time

def main():
    print("[PID:%d]進程運行中..." % os.getpid())
    time.sleep(5)
    os.abort()  # 給自己發異常終止信號

if __name__ == '__main__':
    main()

寫個簡單版本的守護進程:

import os
import time
import signal

def write_log(msg):
    pass

def is_running(p_name):
    """是否在運行"""
    try:
        # grep -v grep 不顯示grep本身,wc -l是計數用的
        result = os.popen("ps ax | grep %s | grep -v grep" % p_name).readlines()
        if len(result) > 0:
            return True
        else:
            return False
    except Exception as ex:
        write_log(ex)
        return False

def is_restart(p_script):
    """重啟程序"""
    try:
        if os.system(p_script) == 0:
            return True
        else:
            return False
    except Exception as ex:
        write_log(ex)
        return False

def heartbeat(signalnum, frame):
    """心跳檢查"""
    p_name = "test.py"
    p_script = "python3 ./test.py"

    if not is_running(p_name):
        print("程序(%s)已掛,准備重啟" % p_name)
        if not is_restart(p_script):
            is_restart(p_script)  # 再給一次機會

def main():
    # 信號處理
    signal.signal(signal.SIGALRM, heartbeat)
    # 第一次1s后檢查,以后每5s檢查一次
    signal.setitimer(signal.ITIMER_REAL, 1, 5)
    while True:
        time.sleep(5)  # 不用擔心影響signal(優先級別高)

if __name__ == '__main__':
    main()

輸出:

程序(test.py)已掛,准備重啟
[PID:7270]進程運行中...
Aborted (core dumped)
程序(test.py)已掛,准備重啟
[PID:7278]進程運行中...
Aborted (core dumped)
[PID:7284]進程運行中...
.....

正規流程的守護進程

寫了個偽牌子的,現在說說正規的,看看概念的東西:

特點

  1. 后台服務進程
  2. 脫離於控制終端(setpid)
  3. 周期性的執行某個任務|等待某個事件發生(setitimer)
  4. 不受用戶登錄注銷影響(關機影響,不過你可以添加啟動項)
  5. 一般使用以d結尾的服務名稱(約定俗成)

講正式流程前先復習一下上面說的進程組會話

  1. 進程組:每一個進程都屬於一個“進程組”,當一個進程被創建的時候,它默認是其父進程所在組的成員(你們一家
  2. 會 話:幾個進程組又構成一個會話(你們小區

需要擴充幾點:

  1. 進程組

    1. 組長:第一個進程
    2. 組長ID==進程組ID
    3. 組長掛了不影響進程組
  2. 會話

    1. 組長不能創建會話(你都有官了,不留點門路給后人?)
    2. 創建會話的進程成為新進程組的組長(新進程組里面就它一個嘛)
    3. 創建出新會話會丟棄原有的控制終端(到了新環境里面,人脈得重新建立)

稍微驗證一下,然后步入正題:

import os
import time

def main():
    pid = os.fork()
    if pid == 0:
        for i in range(7):
            print("子進程:PID=%d,PPID=%d,PGrpID=%d" % (os.getpid(), os.getppid(), os.getpgrp()))
            time.sleep(i)
    elif pid > 0:
        print("父進程:PID=%d,PPID=%d,PGrpID=%d" % (os.getpid(), os.getppid(), os.getpgrp()))
        time.sleep(4)

    print("遺言:PID=%d,PPID=%d,PGrpID=%d" % (os.getpid(), os.getppid(), os.getpgrp()))

if __name__ == '__main__':
    main()

驗證結果:父進程ID==進程組ID父進程掛了進程組依舊在,順便驗證了下ps -ajx的參數 7.psjax

先看看這個SessionID是啥:

import os
import time

def main():
    print("進程:PID=%d,PPID=%d,PGrpID=%d" % (os.getpid(), os.getppid(), os.getpgrp()))
    print(os.getsid(os.getpid()))
    for i in range(1, 5):
        time.sleep(i)
    print("over")

if __name__ == '__main__':
    main()

ps ajx的參數現在全知道了:PPID PID PGID SID (你不加grep就能看到的) session_test]

驗證一下SessionID的事情:

In [1]:
# 驗證一下父進程不能創建會話ID
import os

def main():
    pid = os.getpid()
    print("進程:PPID=%d,PID=%d,GID=%d,SID=%d" % (pid, os.getppid(), os.getpgrp(),os.getsid(pid)))
    os.setsid() # 父進程沒法設置為會話ID的驗證


if __name__ == '__main__':
    main()
 
進程:PPID=3301,PID=2588,GID=3301,SID=3301
 
---------------------------------------------------------------------------
PermissionError                           Traceback (most recent call last)
<ipython-input-1-375f70009fcf> in <module>()
      8 
      9 if __name__ == '__main__':
---> 10main()

<ipython-input-1-375f70009fcf> in main()
      4     pid = os.getpid()
      5     print("進程:PPID=%d,PID=%d,GID=%d,SID=%d" % (pid, os.getppid(), os.getpgrp(),os.getsid(pid)))
----> 6os.setsid() # 父進程沒法設置為會話ID的驗證
      7 
      8 

PermissionError: [Errno 1] Operation not permitted
 

步入正軌:

創建守護進程的步驟

  1. fork子進程,父進程退出(子進程變成了孤兒)
  2. 子進程創建新會話(創建出新會話會丟棄原有的控制終端)
  3. 改變當前工作目錄【為了減少bug】(eg:你在某個文件夾下運行,這個文件夾被刪了,多少會點受影響)
  4. 重置文件掩碼(繼承了父進程的文件掩碼,通過umask(0)重置一下,這樣可以獲取777權限)
  5. 關閉文件描述符(既然用不到了,就關了)
  6. 自己的邏輯代碼

先簡單弄個例子實現上面步驟:

import os
import time
from sys import stdin, stdout, stderr

def main():

    # 【必須】1. fork子進程,父進程退出(子進程變成了孤兒)
    pid = os.fork()
    if pid > 0:
        exit(0)

    # 【必須】2. 子進程創建新會話(創建出新會話會丟棄原有的控制終端)
    os.setsid()

    # 3. 改變當前工作目錄【為了減少bug】# 改成不會被刪掉的目錄,比如/
    os.chdir("/home/dnt")  # 我這邊因為是用戶創建的守護進程,就放它下面,用戶刪了,它也沒必要存在了

    # 4. 重置文件掩碼(獲取777權限)
    os.umask(0)

    # 5. 關閉文件描述符(如果寫日志也可以重定向一下)
    os.close(stdin.fileno())
    os.close(stdout.fileno())
    os.close(stderr.fileno())

    # 【必須】6. 自己的邏輯代碼
    while True:
        time.sleep(1)

if __name__ == '__main__':
    main()

運行效果:(直接后台走起了) create_pro


基礎回顧

如果對Linux基礎不熟,可以看看幾年前說的LinuxBase:

Linux基礎命令:http://www.cnblogs.com/dunitian/p/4822807.html

Linux系列其他文章:https://www.cnblogs.com/dunitian/p/4822808.html#linux


如果對部署運行系列不是很熟,可以看之前寫的小demo:

用Python3、NetCore、Shell分別開發一個Ubuntu版的定時提醒(附NetCore跨平台兩種發布方式):https://www.cnblogs.com/dotnetcrazy/p/9111200.html


如果對OOP不是很熟悉可以查看之前寫的OOP文章:

Python3 與 C# 面向對象之~封裝https://www.cnblogs.com/dotnetcrazy/p/9202988.html

Python3 與 C# 面向對象之~繼承與多態https://www.cnblogs.com/dotnetcrazy/p/9219226.html

Python3 與 C# 面向對象之~異常相關https://www.cnblogs.com/dotnetcrazy/p/9219751.html


如果基礎不牢固,可以看之前寫的PythonBase:

Python3 與 C# 基礎語法對比(Function專欄)https://www.cnblogs.com/dotnetcrazy/p/9175950.html

Python3 與 C# 擴展之~模塊專欄https://www.cnblogs.com/dotnetcrazy/p/9253087.html

Python3 與 C# 擴展之~基礎衍生https://www.cnblogs.com/dotnetcrazy/p/9278573.html

Python3 與 C# 擴展之~基礎拓展https://www.cnblogs.com/dotnetcrazy/p/9333792.html


現在正兒八經的來個簡化版的守護進程:(你可以根據需求多加點信號處理)

import os
import time
import signal
from sys import stdin, stdout, stderr

class Daemon(object):
    def __init__(self, p_name, p_script):
        self.p_name = p_name
        self.p_script = p_script

    @staticmethod
    def write_log(msg):
        # 追加方式寫
        with open("info.log", "a+") as f:
            f.write(msg)
            f.write("\n")

    def is_running(self, p_name):
        """是否在運行"""
        try:
            # grep -v grep 不顯示grep本身,wc -l是計數用的
            result = os.popen(
                "ps ax | grep %s | grep -v grep" % p_name).readlines()
            if len(result) > 0:
                return True
            else:
                return False
        except Exception as ex:
            self.write_log(ex)
            return False

    def is_restart(self, p_script):
        """重啟程序"""
        try:
            if os.system(p_script) == 0:
                return True
            else:
                return False
        except Exception as ex:
            self.write_log(ex)
            return False

    def heartbeat(self, signalnum, frame):
        """心跳檢查"""
        if not self.is_running(self.p_name):
            self.write_log("[%s]程序(%s)已掛,准備重啟" % (time.strftime("%Y-%m-%d%X"),
                                                  self.p_name))
            if not self.is_restart(self.p_script):
                self.is_restart(self.p_script)  # 再給一次機會

    def run(self):
        """運行守護進程"""
        pid = os.fork()
        if pid > 0:
            exit(0)

        os.setsid()  # 子進程創建新會話
        os.chdir("/home/dnt")  # 改變當前工作目錄
        os.umask(0)  # 獲取777權限

        # 5. 關閉文件描述符
        os.close(stdin.fileno())
        os.close(stdout.fileno())
        os.close(stderr.fileno())

        # 【必須】6. 自己的邏輯代碼
        # 捕捉設置的定時器
        signal.signal(signal.SIGALRM, self.heartbeat)
        # 第一次2s后執行,以后5s執行一次
        signal.setitimer(signal.ITIMER_REAL, 2, 5)

        self.write_log("[%s]daeman running" % time.strftime("%Y-%m-%d%X"))
        self.write_log("p_name:%s,p_script:%s" % (self.p_name, self.p_script))

        while True:
            time.sleep(5)  # 不用擔心影響signal(優先級別高)

def main():
    try:
        pro = Daemon("test.py", "python3 ~/demo/test.py")
        pro.run()
    except Exception as ex:
        Daemon.write_log(ex)

if __name__ == '__main__':
    main()

運行效果:(關閉文件描述符后就不要printf了)

10.mini

擴展說明,如果你要文件描述符重定向的話可以這么寫:

with open("in.log", "a+") as f:
    os.dup2(f.fileno(), sys.stdin.fileno())
with open("out.log", "a+") as f:
    os.dup2(f.fileno(), sys.stdout.fileno())
with open("err.log", "a+") as f:
    os.dup2(f.fileno(), sys.stderr.fileno())

之后你printf就自動到指定的文件了

 

擴展說明:

Socket,在講基礎最后一個系列~網絡編程的時候會講,不急,而且進程間通信不需要這么‘重量級’

線程相關打算和代碼一起講,有機會也可以單獨拉出來說一個結尾篇


業余拓展:

官方文檔大全

進程間通信和網絡

os - 其他操作系統接口

mmap - 內存映射文件支持

signal - 設置異步事件的處理程序

Other:

Linux下0、1、2號進程
https://blog.csdn.net/gatieme/article/details/51484562
https://blog.csdn.net/gatieme/article/details/51532804
https://blog.csdn.net/gatieme/article/details/51566690

Linux 的啟動流程
http://www.ruanyifeng.com/blog/2013/08/linux_boot_process.html
http://www.ruanyifeng.com/blog/2016/03/systemd-tutorial-commands.html
http://www.ruanyifeng.com/blog/2016/03/systemd-tutorial-part-two.html

孤兒進程與僵屍進程
https://www.cnblogs.com/Anker/p/3271773.html
https://blog.csdn.net/believe_s/article/details/77040494

Python2 OS模塊之進程管理
https://www.cnblogs.com/now-fighting/p/3534185.html

緩沖區的個人理解
https://blog.csdn.net/lina_acm/article/details/51865543

深入Python多進程編程基礎
https://zhuanlan.zhihu.com/p/37370577
https://zhuanlan.zhihu.com/p/37370601

python多進程實現進程間通信實例
https://www.jb51.net/article/129016.htm

PIPE2參考:
https://bugs.python.org/file22147/posix_pipe2.diff
https://stackoverflow.com/questions/30087506/event-driven-system-call-in-python
https://stackoverflow.com/questions/5308080/python-socket-accept-nonblocking/5308168

FIFO參考:
https://blog.csdn.net/an_tang/article/details/68951819
https://blog.csdn.net/firefoxbug/article/details/8137762

Python之mmap內存映射模塊(大文本處理)說明
https://www.cnblogs.com/zhoujinyi/p/6062907.html

python 基於mmap模塊的jsonmmap實現本地多進程內存共享
https://www.cnblogs.com/dacainiao/p/5914114.html

如果把一個事務可看作是一個程序,它要么完整的被執行,要么完全不執行。這種特性就叫原子性。
https://blog.csdn.net/Android_Mrchen/article/details/77866490

事務四大特征:原子性,一致性,隔離性和持久性
https://blog.csdn.net/u014079773/article/details/52808193

python 、mmap 實現內存數據共享
https://www.jianshu.com/p/c3afc0f02560
http://www.cnblogs.com/zhoujinyi/p/6062907.html
https://blog.csdn.net/zhaohongyan6/article/details/71158522

Python信號相關:
https://my.oschina.net/guol/blog/136036

Linux--進程組、會話、守護進程
https://www.cnblogs.com/forstudy/archive/2012/04/03/2427683.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM