有很多開源自動化運維工具都很好用如ansible/salt stack等,完全不用重復造輪子。只不過,很多運維同學學習Python之后,苦於沒小項目訓練,本篇演示用Python寫一個批量操作主機的工具,大家空余時候可以試着寫寫,完善完善。
1 思路分析
在運維工作中,古老的方式部署環境、上線代碼可能都需要手動在服務器上敲命令,不勝其煩。所以,腳本,自動化工具等還是很有必要的。我覺得一個批量操作工具應該考慮以下幾點:
(1)本質上,就是到遠程主機上執行命令並返回結果。
(2)做到批量。也就是要並發對多台機器進行操作。
(3)將返回的結果,清晰地展示給用戶。
通常開源的主機批量管理工具有兩類,一類是有agent,如SaltStack、Puppet等;另一類是無agent如ansible。雖然我們沒必要重復造輪子,但是可以試着寫一寫,一是加深對這類軟件原理的理解,二是練習Python。建議如果服務器規模在1000台以內的用無agent的方式也能hold住;如果超過1000台,用有agent的會好太多。
接下來我們一起看看怎么具體實現。
2 到遠程機器上執行命令
到遠程機器上執行命令,並返回結果,至少有兩種方式:一是用paramiko模塊;而是可以建立機器互信,從中控執行ssh命令。
下面我把自己封裝好的代碼貼一下,是基於paramiko模塊封裝的,ssh的大家可以自己實現:
import paramiko
class SSHParamiko(object):
err = "argument passwd or rsafile can not be None"
def __init__(self, host, port, user, passwd=None, rsafile=None):
self.h = host
self.p = port
self.u = user
self.w = passwd
self.rsa = rsafile
def _connect(self):
if self.w:
return self.pwd_connect()
elif self.rsa:
return self.rsa_connect()
else:
raise ConnectionError(self.err)
def _transfer(self):
if self.w:
return self.pwd_transfer()
elif self.rsa:
return self.rsa_transfer()
else:
raise ConnectionError(self.err)
def pwd_connect(self):
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
conn.connect(self.h, self.p, self.u, self.w)
return conn
def rsa_connect(self):
pkey = paramiko.RSAKey.from_private_key_file(self.rsa)
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
conn.connect(hostname=self.h, port=self.p, username=self.u, pkey=pkey)
return conn
def pwd_transfer(self):
transport = paramiko.Transport(self.h, self.p)
transport.connect(username=self.u, password=self.w)
sftp = paramiko.SFTPClient.from_transport(transport)
return sftp, transport
def rsa_transfer(self):
pkey = paramiko.RSAKey.from_private_key_file(self.rsa)
transport = paramiko.Transport(self.h, self.p)
transport.connect(username=self.u, pkey=pkey)
sftp = paramiko.SFTPClient.from_transport(transport)
return sftp, transport
def run_cmd(self, cmd):
conn = self._connect()
stdin, stdout, stderr = conn.exec_command(cmd)
code = stdout.channel.recv_exit_status()
stdout, stderr = stdout.read(), stderr.read()
conn.close()
if not stderr:
return code, stdout.decode()
else:
return code, stderr.decode()
def get_file(self, remote, local):
sftp, conn = self._transfer()
sftp.get(remote, local)
conn.close()
def put_file(self, local, remote):
sftp, conn = self._transfer()
sftp.put(local, remote)
conn.close()
當然,代碼還可以重構一下哈。接下來我們看下效果:
if __name__ == '__main__':
h = "我的測試機IP"
p = 22
u = "我的用戶名"
w = "我的密碼"
obj = SSHParamiko(h, p, u, w)
r = obj.run_cmd("df -h")
print(r[0])
print(r[1])
執行之后的結果是:
0 Filesystem Size Used Avail Use% Mounted on /dev/vda1 40G 3.4G 34G 9% / devtmpfs 3.9G 0 3.9G 0% /dev tmpfs 3.9G 0 3.9G 0% /dev/shm tmpfs 3.9G 410M 3.5G 11% /run tmpfs 3.9G 0 3.9G 0% /sys/fs/cgroup /dev/vdb 300G 12G 289G 4% /search/odin tmpfs 783M 0 783M 0% /run/user/0 tmpfs 783M 0 783M 0% /run/user/1000
可以清晰看到第一行是命令執行的狀態碼,0表示成功,非0表示失敗;第二行開始就是我們的命令返回結果。是不是比較清晰呢?
3 並發執行並展示輸出結果
並發執行通常用Python3自帶的線程模塊就行,這里我用的from concurrent.futures import ThreadPoolExecutor。並且當拿到結果之后,我還做了一些格式化輸出,比如綠色輸出表示成功,紅色輸出表示命令執行失敗,黃色表示提醒等。廢話不多說,直接看代碼吧!
from concurrent.futures import ThreadPoolExecutor
class AllRun(object):
def __init__(self, ssh_objs, cmds, max_worker=50):
self.objs = [o for o in ssh_objs]
self.cmds = [c for c in cmds]
self.max_worker = max_worker # 最大並發線程數
self.success_hosts = [] # 存放成功機器數目
self.failed_hosts = [] # 存放失敗的機器IP
self.mode = None
self.func = None
def serial_exec(self, obj):
"""單台機器上串行執行命令,並返回結果至字典"""
result = list()
for c in self.cmds:
r = obj.run_cmd(c)
result.append([c, r])
return obj, result
def concurrent_run(self):
"""並發執行"""
future = ThreadPoolExecutor(self.max_worker)
for obj in self.objs:
try:
future.submit(self.serial_exec, obj).add_done_callback(self.callback)
except Exception as err:
err = self.color_str(err, "red")
print(err)
future.shutdown(wait=True)
def callback(self, future_obj):
"""回調函數,處理返回結果"""
ssh_obj, rlist = future_obj.result()
print(self.color_str("{} execute detail:".format(ssh_obj.h), "yellow"))
is_success = True
for item in rlist:
cmd, [code, res] = item
info = f"{cmd} | code => {code}\nResult:\n{res}"
if code != 0:
info = self.color_str(info, "red")
is_success = False
if ssh_obj.h not in self.failed_hosts:
self.failed_hosts.append(ssh_obj.h)
else:
info = self.color_str(info, "green")
print(info)
if is_success:
self.success_hosts.append(ssh_obj.h)
if ssh_obj.h in self.failed_hosts:
self.failed_hosts.remove(ssh_obj.h)
def overview(self):
"""展示總的執行結果"""
for i in self.success_hosts:
print(self.color_str(i, "green"))
print("-" * 30)
for j in self.failed_hosts:
print(self.color_str(j, "red"))
info = "Success hosts {}; Failed hosts {}."
s, f = len(self.success_hosts), len(self.failed_hosts)
info = self.color_str(info.format(s, f), "yellow")
print(info)
@staticmethod
def color_str(old, color=None):
"""給字符串添加顏色"""
if color == "red":
new = "\033[31;1m{}\033[0m".format(old)
elif color == "yellow":
new = "\033[33;1m{}\033[0m".format(old)
elif color == "blue":
new = "\033[34;1m{}\033[0m".format(old)
elif color == "green":
new = "\033[36;1m{}\033[0m".format(old)
else:
new = old
return new
if __name__ == '__main__':
h1 = "adime01.shouji.sjs.ted"
p1 = 22
u1 = "odin"
w1 = "*****"
h = "10.129.206.97"
p = 22
u = "root"
w = "*****"
obj1 = SSHParamiko(h1, p1, u1, w1)
obj = SSHParamiko(h, p, u, w)
cmds = ["df -h", "ls"]
all_obj = AllRun([obj1, obj], cmds)
all_obj.concurrent_run()
all_obj.overview()
上述代碼運行的結果:

從執行結果來看,高亮顯示,清新明了。既顯示了各個主機的各個命令執行狀態碼,返回結果,最后還匯總結果,成功了多少台機器和失敗了多少台機器。
我們還可以換一下執行的命令,讓命令執行失敗看看:

后期還可以包裝一下,將主機、密碼、批量執行的命令寫在配置文件中;或再根據需要包裝成命令行工具,在日常運維工作中可以適當減少人肉敲命令的繁瑣。
