# -*- coding: utf-8 -*-
import os
import subprocess
import signal
import pwd
import sys
class MockLogger(object):
'''模擬日志類。方便單元測試。'''
def __init__(self):
self.info = self.error = self.critical = self.debug
def debug(self, msg):
print "LOGGER:"+msg
class Shell(object):
'''完成Shell腳本的包裝。
執行結果存放在Shell.ret_code, Shell.ret_info, Shell.err_info中
run()為普通調用,會等待shell命令返回。
run_background()為異步調用,會立刻返回,不等待shell命令完成
異步調用時,可以使用get_status()查詢狀態,或使用wait()進入阻塞狀態,
等待shell執行完成。
異步調用時,使用kill()強行停止腳本后,仍然需要使用wait()等待真正退出。
TODO 未驗證Shell命令含有超大結果輸出時的情況。
'''
def __init__(self, cmd):
self.cmd = cmd # cmd包括命令和參數
self.ret_code = None
self.ret_info = None
self.err_info = None
#使用時可替換為具體的logger
self.logger = MockLogger()
def run_background(self):
'''以非阻塞方式執行shell命令(Popen的默認方式)。
'''
self.logger.debug("run %s"%self.cmd)
# Popen在要執行的命令不存在時會拋出OSError異常,但shell=True后,
# shell會處理命令不存在的錯誤,因此沒有了OSError異常,故不用處理
self._process = subprocess.Popen(self.cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE) #非阻塞
def run(self):
'''以阻塞方式執行shell命令。
'''
self.run_background()
self.wait()
def run_cmd(self, cmd):
'''直接執行某條命令。方便一個實例重復使用執行多條命令。
'''
self.cmd = cmd
self.run()
def wait(self):
'''等待shell執行完成。
'''
self.logger.debug("waiting %s"%self.cmd)
self.ret_info, self.err_info = self._process.communicate() #阻塞
# returncode: A negative value -N indicates that the child was
# terminated by signal N
self.ret_code = self._process.returncode
self.logger.debug("waiting %s done. return code is %d"%(self.cmd,
self.ret_code))
def get_status(self):
'''獲取腳本運行狀態(RUNNING|FINISHED)
'''
retcode = self._process.poll()
if retcode == None:
status = "RUNNING"
else:
status = "FINISHED"
self.logger.debug("%s status is %s"%(self.cmd, status))
return status
# Python2.4的subprocess還沒有send_signal,terminate,kill
# 所以這里要山寨一把,2.7可直接用self._process的kill()
def send_signal(self, sig):
self.logger.debug("send signal %s to %s"%(sig, self.cmd))
os.kill(self._process.pid, sig)
def terminate(self):
self.send_signal(signal.SIGTERM)
def kill(self):
self.send_signal(signal.SIGKILL)
def print_result(self):
print "return code:", self.ret_code
print "return info:", self.ret_info
print " error info:", self.err_info
class RemoteShell(Shell):
'''遠程執行命令(ssh方式)。
XXX 含特殊字符的命令可能導致調用失效,如雙引號,美元號$
NOTE 若cmd含有雙引號,可使用RemoteShell2
'''
def __init__(self, cmd, ip):
ssh = ("ssh -o PreferredAuthentications=publickey -o "
"StrictHostKeyChecking=no -o ConnectTimeout=10")
# 不必檢查IP有效性,也不必檢查信任關系,有問題shell會報錯
cmd = '%s %s "%s"'%(ssh, ip, cmd)
Shell.__init__(self, cmd)
class RemoteShell2(RemoteShell):
'''與RemoteShell相同,只是變換了引號。
'''
def __init__(self, cmd, ip):
RemoteShell.__init__(self, cmd, ip)
self.cmd = "%s %s '%s'"%(ssh, ip, cmd)
class SuShell(Shell):
'''切換用戶執行命令(su方式)。
XXX 只適合使用root切換至其它用戶。
因為其它切換用戶后需要輸入密碼,這樣程序會掛住。
XXX 含特殊字符的命令可能導致調用失效,如雙引號,美元號$
NOTE 若cmd含有雙引號,可使用SuShell2
'''
def __init__(self, cmd, user):
if os.getuid() != 0: # 非root用戶直接報錯
raise Exception('SuShell must be called by root user!')
cmd = 'su - %s -c "%s"'%(user, cmd)
Shell.__init__(self, cmd)
class SuShell2(SuShell):
'''與SuShell相同,只是變換了引號。
'''
def __init__(self, cmd, user):
SuShell.__init__(self, cmd, user)
self.cmd = "su - %s -c '%s'"%(user, cmd)
class SuShellDeprecated(Shell):
'''切換用戶執行命令(setuid方式)。
執行的函數為run2,而不是run
XXX 以“不干凈”的方式運行:僅切換用戶和組,環境變量信息不變。
XXX 無法獲取命令的ret_code, ret_info, err_info
XXX 只適合使用root切換至其它用戶。
'''
def __init__(self, cmd, user):
self.user = user
Shell.__init__(self, cmd)
def run2(self):
if os.getuid() != 0: # 非root用戶直接報錯
raise Exception('SuShell2 must be called by root user!')
child_pid = os.fork()
if child_pid == 0: # 子進程干活
uid, gid = pwd.getpwnam(self.user)[2:4]
os.setgid(gid) # 必須先設置組
os.setuid(uid)
self.run()
sys.exit(0) # 子進程退出,防止繼續執行其它代碼
else: # 父進程等待子進程退出
os.waitpid(child_pid, 0)
if __name__ == "__main__":
'''test code'''
# 1. test normal
sa = Shell('who')
sa.run()
sa.print_result()
# 2. test stderr
sb = Shell('ls /export/dir_should_not_exists')
sb.run()
sb.print_result()
# 3. test background
sc = Shell('sleep 1')
sc.run_background()
print 'hello from parent process'
print "return code:", sc.ret_code
print "status:", sc.get_status()
sc.wait()
sc.print_result()
# 4. test kill
import time
sd = Shell('sleep 2')
sd.run_background()
time.sleep(1)
sd.kill()
sd.wait() # NOTE, still need to wait
sd.print_result()
# 5. test multiple command and uncompleted command output
se = Shell('pwd;sleep 1;pwd;pwd')
se.run_background()
time.sleep(1)
se.kill()
se.wait() # NOTE, still need to wait
se.print_result()
# 6. test wrong command
sf = Shell('aaaaa')
sf.run()
sf.print_result()
# 7. test instance reuse to run other command
sf.cmd = 'echo aaaaa'
sf.run()
sf.print_result()
sg = RemoteShell('pwd', '127.0.0.1')
sg.run()
sg.print_result()
# unreachable ip
sg2 = RemoteShell('pwd', '17.0.0.1')
sg2.run()
sg2.print_result()
# invalid ip
sg3 = RemoteShell('pwd', '1711.0.0.1')
sg3.run()
sg3.print_result()
# ip without trust relation
sg3 = RemoteShell('pwd', '10.145.132.247')
sg3.run()
sg3.print_result()
sh = SuShell('pwd', 'ossuser')
sh.run()
sh.print_result()
# wrong user
si = SuShell('pwd', 'ossuser123')
si.run()
si.print_result()
# user need password
si = SuShell('pwd', 'root')
si.run()
si.print_result()