Python 3 利用 subprocess 實現管道( pipe )交互操作讀/寫通信


這里我們用Windows下的shell來舉例:

from subprocess import * #因為是舉例,就全部導入了

為了方便你理解,我們用一個很簡單的一段代碼來說明:

可以看見我們利用Popen實例化了一個p,創建了子程序cmd.exe,然后我們給他的的Stdin(標准輸入流)Stdout(標准輸出流);

同時使用了subprocess.PIPE 作為參數,這個是一個特殊值,用於表明這些通道要開放(在Python3.5,加入了run()方法來進行更好的操作)

 

然后我們繼續

這些信息是不是很眼熟?這都是cmd的標准輸出!

然后就會輸出這些:

我們剛剛所寫入的信息"echo Hellwworlds\r\n"已經被寫入了,看起來確實成功了!

注意一下我們使用了 p.stdin.flush() 來對輸入緩存區進行刷新,輸出的信息也需要一個 "\r\n",至少在 Windows 系統下必須這樣做,否則只刷新(p.stdin.flush)的話是無效的;

我們到底做了什么?


我們成功的創建了子程序 cmd.exe,並且寫入"echo Hellwworlds\r\n" ,然后cmd獲取了並且執行,於是返回 Hellwworlds,這就是一次很簡單的讀寫交互!

 

更高級的使用


 

既然我們已經可以簡單的讀寫了,那么加點for和threading 吧,味道也許更佳喔~

 1 #run.py  
 2 
 3 from subprocess import * 
 4 import threading 
 5 import time
 6 
 7 p =Popen('cmd.exe',shell=True,stdin=PIPE,stdout=PIPE)
 8 
 9 def run():
10     global p
11     while True:
12         line = p.stdout.readline() 
13         if not line:  #空則跳出
14             break
15         print(">>>>>>",line.decode("GBK"))
16 
17     print("look up!!! EXIT ===")   #跳出
18 
19 
20 w =threading.Thread(target=run)
21 
22 p.stdin.write("echo HELLW_WORLD!\r\n".encode("GBK"))
23 p.stdin.flush()
24 time.sleep(1) #延遲是因為等待一下線程就緒
25 p.stdin.write("exit\r\n".encode("GBK"))
26 p.stdin.flush()
27 
28 w.start()

 

很好很好,猜猜輸出什么?

有很多換行的原因是cmd返回的結果有換行,然后print輸出會加一個換行,所以就換了兩行,你可以考慮使用 sys.stdout.write 來輸出,這樣就沒有附加的換行了

這樣的話,你可以制作一個基礎的讀寫了,那么我們開始封裝吧。

 

 

封裝Pipe


 不廢話了,直接上代碼,如果你真的想學會的話,還請認真自己讀讀代碼。

 110行

我們實現了將所有的過程集中在一個類里面,並且可以定義三個參數,退出反饋函數,就緒反饋函數和輸出反饋函數。

  1 # -*- coding:utf-8 -*-
  2 
  3 import subprocess  
  4 import sys
  5 import threading
  6 
  7 class LoopException(Exception):
  8     """循環異常自定義異常,此異常並不代表循環每一次都是非正常退出的"""
  9     def __init__(self,msg="LoopException"):
 10         self._msg=msg
 11 
 12     def __str__(self):
 13         return self._msg
 14 
 15 
 16 
 17 class SwPipe():
 18     """
 19     與任意子進程通信管道類,可以進行管道交互通信
 20     """
 21     def __init__(self,commande,func,exitfunc,readyfunc=None,
 22         shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,code="GBK"):
 23         """
 24         commande 命令
 25         func 正確輸出反饋函數
 26         exitfunc 異常反饋函數
 27         readyfunc 當管道創建完畢時調用
 28         """
 29         self._thread = threading.Thread(target=self.__run,args=(commande,shell,stdin,stdout,stderr,readyfunc))
 30         self._code = code
 31         self._func = func
 32         self._exitfunc = exitfunc
 33         self._flag = False
 34         self._CRFL = "\r\n"
 35 
 36     def __run(self,commande,shell,stdin,stdout,stderr,readyfunc):
 37         """ 私有函數 """
 38         try:
 39             self._process = subprocess.Popen(
 40                 commande,
 41                 shell=shell,
 42                 stdin=stdin,
 43                 stdout=stdout,
 44                 stderr=stderr
 45                 )  
 46         except OSError as e:
 47             self._exitfunc(e)
 48         fun = self._process.stdout.readline
 49         self._flag = True
 50         if readyfunc != None:
 51             threading.Thread(target=readyfunc).start() #准備就緒
 52         while True:
 53             line = fun()  
 54             if not line:  
 55                 break
 56             try:
 57                 tmp = line.decode(self._code)
 58             except UnicodeDecodeError:
 59                 tmp =  \
 60                 self._CRFL + "[PIPE_CODE_ERROR] <Code ERROR: UnicodeDecodeError>\n" 
 61                 + "[PIPE_CODE_ERROR] Now code is: " + self._code + self._CRFL
 62             self._func(self,tmp)
 63 
 64         self._flag = False
 65         self._exitfunc(LoopException("While Loop break"))   #正常退出
 66 
 67 
 68     def write(self,msg):
 69         if self._flag:
 70             #請注意一下這里的換行
 71             self._process.stdin.write((msg + self._CRFL).encode(self._code)) 
 72             self._process.stdin.flush()
 73             #sys.stdin.write(msg)#怎么說呢,無法直接用代碼發送指令,只能默認的stdin
 74         else:
 75             raise LoopException("Shell pipe error from '_flag' not True!")  #還未准備好就退出
 76 
 77 
 78     def start(self):
 79         """ 開始線程 """
 80         self._thread.start()
 81 
 82     def destroy(self):
 83         """ 停止並銷毀自身 """
 84         process.stdout.close()
 85         self._thread.stop()
 86         del self
 87        
 88 
 89 
 90 
 91 
 92 
 93 if __name__ == '__main__':   #那么我們來開始使用它吧  94     e = None
 95 
 96     #反饋函數
 97     def event(cls,line):#輸出反饋函數
 98         sys.stdout.write(line)
 99 
100     def exit(msg):#退出反饋函數
101         print(msg)
102 
103     def ready():#線程就緒反饋函數
104         e.write("dir")  #執行 105         e.write("ping www.baidu.com")
106         e.write("echo Hello!World 你好中國!你好世界!")
107         e.write("exit")
108 
109     e = SwPipe("cmd.exe",event,exit,ready)
110     e.start()

輸出:

 

 你可以看見,我們的指令都順序的執行了。當然了這里面還有OS的功勞。

 

那么你的可擴展的Pipe類應該已經構建完畢了吧?

A: 我之所以要在這種代碼前面加入行數的原因就是為了防止你復制;因為你可能永遠不會明白這里究竟發生了什么,而是只懂得了使用。

順便一提:

最好去參考一下官方的文檔,已經講得非常詳細了。subprocess.Popen.communicate 或許更適合你,看你是要進行什么事情。

參考:

https://docs.python.org/3/library/subprocess.html

到此結束,如有錯誤之處還望指正。

 

不論是否對你有幫助,感謝你耐心閱讀

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM