Python實現cmd命令連續執行


之前是想寫一個微信控制程序,通過登錄網頁微信,可以直接執行命令行代碼。也不用ssh登錄了,想法很方便。

但是現實很殘酷,微信登錄這塊基本沒有問題,已經有大佬寫好了,但是命令行執行遇到問題了。

運行cmd

開始時,使用os.popen()執行命令,但是該命令需要手動修改運行目錄。此方案被我直接丟棄了。

單開進程

那么自然想到通過啟動進程的方式來實現,Python有對進程的封裝subprocess,可以通過創建Popen對象來實現。我只要單開一個bash,與它進行交互就好啦。

簡單實現如下:


p = subprocess.Popen('/bin/bash', shell=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
​
while True:
    c = input()
    c += os.linesep
    p.stdin.write(c.encode('utf8'))
    print(out_s.decode('utf8'), end='')
然后,馬上就有遇到問題了,輸出流一直拿不到內容,被阻塞了。

刷新緩沖區

被阻塞有兩種情況,一輸入流阻塞,所以沒有輸出,二輸出流阻塞。看到網上有的將輸入流關閉就可以了:

p.stdin.close()

但是關閉后就不能再次運行命令了,通過查看其對象方法,發現可以直接刷新緩沖區,很好

p.stdin.flush()

但是發現讀取到的文件只有一行,很明顯,沒有讀完

循環讀取

需要循環讀取輸出緩沖區的內容。

while True:
    out_s = p.stdout.readline()
    print(out_s.decode('utf8'), end='')

 

新的問題出現了,循環怎么結束啊?當緩沖區沒有內容時,readline方法會阻塞等待。

讀取阻塞

很好,找了半天也沒找到解決阻塞的辦法。那就只能靠自己了,既然它要阻塞,那就隨他阻塞好了,我單開一個線程去讀取,讓它一直阻塞去吧。

解決后的完整測試代碼:

import subprocess
import os
import threading
​
​
p = subprocess.Popen('/bin/bash', shell=True, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
​
def test():
    global p
    while True:
        print(p.stdout.readline().decode('utf8'), end='')
​
threading.Thread(target=test).start()
​
while True:
    c = input()
    c += os.linesep
    p.stdin.write(c.encode('utf8'))
    p.stdin.flush()

 

很好,問題解決了,簡單封裝一個工具類吧。

注意:如果輸入一個不存在的命令,輸出內容不在stdout流中,要到stderr中獲取。此方案暫時還不支持sudo命令,回頭在研究研究


至此,其實還有一個小問題,我怎么能知道哪些返回是同一條命令所返回的呢?就這個微信工具來說,自然可以直接通過時間判斷,若超過1s沒有,則認為是一組,統一返回。感覺有些牽強,暫時沒有想到更好的解決辦法。

最后奉上工具鏈接:

<https://gitee.com/hujingnb/python_demo>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM