Python-subprocess執行命令並將輸出劫持實現實時記錄到日志
前言
在寫我自己的練手項目的時候,需要寫一系列Python腳本來幫助我進行運維/環境配置,我希望這些腳本能夠有比較好的日志記錄。
這一篇博客中,我實現了日志同時向控制台和日志中進行輸出,並且二者的日志等級、日志格式不相同。
這一篇博客中,我通過自定義日志的格式描述符,實現了定長的日志等級。
解決完日志問題后,我需要解決這些腳本的另一個問題:執行命令。
我寫這一系列Python腳本就是因為我不擅長寫shell腳本,而且Python腳本在我的開發環境(Windows)和運行環境(Linux)都可以運行。其需要擔當的很重要的一個功能就是執行一些命令,如kubectl apply
、mvn test
等。
我希望執行這些命令的時候,能夠實時地將命令的輸出,記錄到日志中。
本博客,我實現了執行命令,並將命令執行的輸出進行劫持,實時記錄到日志中。
參考
subprocess
關於多進程協同,Python主要有三個手段:
- os.system()函數;
- multiprocessing模塊;
- subprocess模塊;
其中multiprocessing
主要用於解決計算密集型計算中,Python由於GIL(全局解釋器鎖)的存在,多線程無法提升性能,而需要創建多個進程來加快計算的場景。
而os.system()
則是阻塞式的,而且似乎無法將其創建的子進程的輸出進行劫持。
subprocess
模塊幾乎是唯一的選擇。
Popen
subprocess
雖然提供了簡易使用的subprocess.run()
方法,但是這個方法無法做到實時輸出,也就是命令執行完成之后才能一次性獲取命令的輸出,即使傳入了stdout=subprocess.PIPE
要求其創建一個管道,仍舊是阻塞式的讀取。stdout
也可以指定為一個流,因此我們可以將其直接重定向到本程序的標准輸出,但是logger並不是一個流。
所以我們只能使用更為底層的subprocess.Popen對象來進行操作。
process = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
通過這樣的參數,創建子進程運行cmd
指定的命令,同時創建與子進程關聯的process
對象,此時process.stdout
和process.stderr
都是可以調用readline
方法來一行行讀取字符串的管道。
實現
通過subprocess.Popen()
創建Popen
對象后,子進程開始運行,我們可以通過管道來讀取子進程的輸出。因為有兩個管道都需要讀取,並且它們沒有提供檢查管道是否讀取的方法,只能阻塞地嘗試進行讀取,所以我們需要創建讀取線程來分別讀取這兩個管道。
實現如下:
import logging
import subprocess
import shlex
import threading
class CommandExecutionException(Exception):
def __init__(self, command: str, exit_code: int) -> None:
super().__init__(f"command executed fail with exit-code={exit_code}: {command}")
_logger = logging.getLogger(__name__)
class TextReadLineThread(threading.Thread):
def __init__(self, readline, callback, *args, **kargs) -> None:
super().__init__(*args, **kargs)
self.readline = readline
self.callback = callback
def run(self):
for line in iter(self.readline, ""):
if len(line) == 0:
break
self.callback(line)
def cmd_exec(command: str, ensure_success: bool=True) -> int:
_logger.info("executing command: {}".format(command))
cmd = shlex.split(command)
process = subprocess.Popen(
cmd,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_logger.debug("started command")
def log_warp(func):
def _wrapper(line: str):
return func("\t" + line.strip())
return _wrapper
read_stdout = TextReadLineThread(process.stdout.readline, log_warp(_logger.info))
read_stderr = TextReadLineThread(process.stderr.readline, log_warp(_logger.warning))
read_stdout.start()
read_stderr.start()
read_stdout.join()
_logger.debug("stdout reading finish")
read_stderr.join()
_logger.debug("stderr reading finish")
ret = process.wait()
_logger.debug("process finish")
_logger.info("executed command with exit-code={}".format(ret))
if ensure_success and ret != 0:
raise CommandExecutionException(command=command, exit_code=ret)
return ret
if __name__ == '__main__':
_logger_trans = {
"DEBUG": "DBG",
"INFO": "INF",
"WARNING": "WAR",
"CRITICAL": "ERR"
}
_old_factory = logging.getLogRecordFactory()
def factory(name, level, fn, lno, msg, args, exc_info, func=None, sinfo=None, **kwargs)->logging.LogRecord:
record = _old_factory(name, level, fn, lno, msg, args, exc_info, func, sinfo, **kwargs)
record.shortlevelname = _logger_trans[record.levelname]
return record
logging.setLogRecordFactory(factory)
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s %(shortlevelname)s] %(message)s',
datefmt="%Y/%m/%d %H:%M:%S"
)
cmd_exec("ping localhost", ensure_success=False)
在TextReadLineThread
的run方法中,我們使用了內置函數iter()
的兩個參數的形式,其第一個參數為一個可調用對象,第二個參數為該可調用對象可能的輸出,通過iter(func, sentinel)
函數產生的迭代器,每次迭代時都無參數地調用func
,直到func()
返回sentinel
時結束迭代。
關於main
中日志配置,參考這一篇博客和這一篇博客,對日志格式進行增強,使得我們能夠明確看到子進程的輸出被劫持了。
這里我創建了兩個線程,一個用於讀取stdout
,一個用於讀取stderr
,然后阻塞掉當前線程。其實可以只創建一個新的線程用於讀取stderr
,用當前線程讀取stdout
。
我使用了ping
命令來進行測試,因為ping
明顯是隔一段時間輸出一段,可以明顯看出是不是實時進行日志記錄。但是linux上ping
默認是不會停止的,需要注意一下。