需求:
在CMDB系統中,我們需要對資產進行采集和資產入庫,包括serverBasic、disk、memory、nic信息等,客戶端需要采集這些硬件的信息,服務端則負責資產入庫,但是需要采集的硬件並不是固定不變的,我們需要根據實際情況適當的添加或者減少硬件信息的采集,所以在生產環境中,我們把每個硬件信息的采集和入庫做成插件,相互獨立,可在配置文件中增減或者移除。
知識點:
字符串的形式導入插件方式:(字符串的格式也就是插件這個類的路徑,這種格式正好也是類的導入格式)
# 根據字符串形式導入模塊,並且找到其中的類並執行 import importlib v = "src.plugins.nic.Nic" module_path,cls_name = v.rsplit('.',maxsplit=1) m = importlib.import_module(module_path) cls = getattr(m,cls_name) obj = cls() obj.process()
設計: (參考django中間件的加載方式)
在settings配置文件中做好插件配置,在加載插件的方法中,使用字符串形式導入類的方法,循環加載各個插件。
settings.py

plugins/__init__.py下的加載插件方法:
def exec_plugin(self): server_info = {} for k,v in self.plugin_items.items(): # 找到v字符串:src.plugins.nic.Nic, # src.plugins.disk.Disk info = {'status':True,'data': None,'msg':None} try: module_path,cls_name = v.rsplit('.',maxsplit=1) module = importlib.import_module(module_path) cls = getattr(module,cls_name) if hasattr(cls,'initial'): obj = cls.initial() else: obj = cls() ret = obj.process(self.exec_cmd,self.test) info['data'] = ret except Exception as e: info['status'] = False info['msg'] = traceback.format_exc() server_info[k] = info return server_info
應用實例(及整個業務邏輯):
客戶端:
執行流程:run.py >> script.py >> client.py >> plugins
run.py,設置全局變量,主函數啟動

import sys import os BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASEDIR) # 設置全局變量(手動配置路徑) os.environ['AUTO_CLIENT_SETTINGS'] = "conf.settings" # # 導入插件管理類 # from src.plugins import PluginManager from src import script if __name__ == '__main__': script.start()
script.py,約束采集信息模式(agent,ssh,salt),兼容三種模式

from lib.config import settings from .client import AgentClient from .client import SaltSshClient def start(): # 這個函數用來判斷模式,並約束可選模式 if settings.MODE == 'AGENT': obj = AgentClient() elif settings.MODE == "SSH" or settings.MODE == 'SALT': obj = SaltSshClient() else: raise Exception('模式僅支持:AGENT/SSH/SALT') obj.exec()
client.py,三種模式的調用插件,API驗證

import requests,json,time,hashlib from src.plugins import PluginManager from lib.config import settings from concurrent.futures import ThreadPoolExecutor class BaseClient(object): def __init__(self): self.api = settings.API def post_server_info(self,server_dict): # requests.post(self.api,data=server_dict) # 1. k=v&k=v, 2. content-type: application/x-www-form-urlencoded response = requests.post(self.api,json=server_dict,headers={'auth-api':auth_header_val}) # 1. 字典序列化;2. 帶請求頭 content-type: application/json def exec(self): raise NotImplementedError('必須實現exec方法') class AgentClient(BaseClient): # 繼承BaseClient類 def exec(self): obj = PluginManager() server_dict = obj.exec_plugin() print('采集到的服務器信息:',server_dict) self.post_server_info(server_dict) class SaltSshClient(BaseClient): # 繼承BaseClient類 def task(self,host): obj = PluginManager(host) server_dict = obj.exec_plugin() self.post_server_info(server_dict) def get_host_list(self): response = requests.get(self.api,headers={'auth-api':auth_header_val}) print(response.text) # [{"hostname":"c1.com"}] return json.loads(response.text) # return ['c1.com',] def exec(self): pool = ThreadPoolExecutor(10) host_list = self.get_host_list() for host in host_list: # 注意格式,如果host_list為字典,用host.hostname取值 pool.submit(self.task,host)
plugins

__init__.py,初始化插件

import importlib import requests from lib.config import settings import traceback class PluginManager(object): def __init__(self,hostname=None): self.hostname = hostname self.plugin_items = settings.PLUGIN_ITEMS self.mode = settings.MODE self.test = settings.TEST if self.mode == "SSH": self.ssh_user = settings.SSH_USER self.ssh_port = settings.SSH_PORT self.ssh_pwd = settings.SSH_PWD def exec_plugin(self): server_info = {} for k,v in self.plugin_items.items(): # 找到v字符串:src.plugins.nic.Nic, # src.plugins.disk.Disk info = {'status':True,'data': None,'msg':None} try: module_path,cls_name = v.rsplit('.',maxsplit=1) module = importlib.import_module(module_path) cls = getattr(module,cls_name) if hasattr(cls,'initial'): obj = cls.initial() else: obj = cls() ret = obj.process(self.exec_cmd,self.test) info['data'] = ret except Exception as e: info['status'] = False info['msg'] = traceback.format_exc() server_info[k] = info return server_info def exec_cmd(self,cmd): # 調用插件方法時,會把執行命令的方法作為參數傳到插件,line:48 if self.mode == "AGENT": import subprocess result = subprocess.getoutput(cmd) elif self.mode == "SSH": import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.hostname, port=self.ssh_port, username=self.ssh_user, password=self.ssh_pwd) stdin, stdout, stderr = ssh.exec_command(cmd) result = stdout.read() ssh.close() elif self.mode == "SALT": import subprocess result = subprocess.getoutput('salt "%s" cmd.run "%s"' %(self.hostname,cmd)) else: raise Exception("模式選擇錯誤:AGENT,SSH,SALT") return result
服務端:
執行流程:views.py >> plugins
settings.py
PLUGIN_ITEMS = { "nic": "api.plugins.nic.Nic", "disk": "api.plugins.disk.Disk", "memory": "api.plugins.memory.Memory", }
views.py:獲取未采集列表,調用寫入數據庫插件
manager = PluginManager() response = manager.exec(server_dict) return HttpResponse(json.dumps(response))
plugins

__init__.py,初始化插件
for k,v in self.plugin_items.items(): print(k,v) # try: module_path,cls_name = v.rsplit('.',maxsplit=1) md = importlib.import_module(module_path) cls = getattr(md,cls_name) obj = cls(server_obj,server_dict[k]) obj.process() print(k,v)