Yearning是一個mysql開源sql語句審核平台,工單流程:開發者提交sql工單->主管審核->運維執行
如果你只是一個工單執行者可以屏蔽一些關鍵詞后自動執行工單
#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import jsonpath import time import sys import json def get_token(): # 獲取認證的token data = { "username": "", "password": "" } headers = { "Accept": "application/json" } request = requests.post(api_url + "ldapauth", data=data, headers=headers) request = request.json() token = jsonpath.jsonpath(request, "$.token")[0] return token def get_work(): # 獲取工單列表 headers = { "Authorization": "JWT" + " " + token } request = requests.get( api_url + 'audit_sql?page=1&query={"picker":["",""],"user":"","valve":false}', headers=headers) return request.json() def get_sql(): # 獲取工單里面的sql headers = { "Accept": "application/json", "Authorization": "JWT" + " " + token } request = requests.get( api_url + 'getsql?id=' + str(id) + '&bundle_id=' + str(bundle_id), headers=headers) return request.json() def audit_sql(): # 執行工單 data = { "type": 1, "to_user": username, "id": id } data = json.dumps(data) headers = { "Content-Type": "application/json", "Authorization": "JWT" + " " + token } request = requests.put(api_url + 'audit_sql', data=data, headers=headers) return request.text if __name__ == "__main__": api_url = "https://yearning.example.com/api/v1/" token = get_token() work_list = get_work() for v in work_list['data']: id = v['id'] bundle_id = v['bundle_id'] status = v['status'] type = v['type'] username = v['username'] work_id = v['work_id'] if status == 2: # 工單狀態等於2則代表待執行 sql = get_sql()['sql'] sql = str.lower(sql) keyword = ["drop", "truncate", "grant", "lock"] for i in range(0, len(keyword)): key = keyword[i] if key in sql: print("停止執行有敏感詞:" + key) sys.exit() audit_sql_return = audit_sql() #執行工單 now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) log = now_time + ' ' + audit_sql_return + ' ' + work_id + ' "' + sql + '"' log_file = '/tmp/audit-sql.log' print(log) f = open(log_file, 'a') f.write(log + '\n') f.close() time.sleep(2)