Yearning sql工單系統 自動執行工單python腳本


Yearning是一個mysql開源sql語句審核平台,工單流程:開發者提交sql工單->主管審核->運維執行

如果你只是一個工單執行者可以屏蔽一些關鍵詞后自動執行工單

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import requests
import jsonpath
import time
import sys
import json


def get_token():
    # 獲取認證的token
    data = {
        "username": "",
        "password": ""
    }
    headers = {
        "Accept": "application/json"
    }
    request = requests.post(api_url + "ldapauth", data=data, headers=headers)
    request = request.json()
    token = jsonpath.jsonpath(request, "$.token")[0]
    return token


def get_work():
    # 獲取工單列表
    headers = {
        "Authorization": "JWT" + " " + token
    }
    request = requests.get(
        api_url + 'audit_sql?page=1&query={"picker":["",""],"user":"","valve":false}',
        headers=headers)
    return request.json()


def get_sql():
    # 獲取工單里面的sql
    headers = {
        "Accept": "application/json",
        "Authorization": "JWT" + " " + token
    }
    request = requests.get(
        api_url + 'getsql?id=' + str(id) + '&bundle_id=' + str(bundle_id),
        headers=headers)
    return request.json()


def audit_sql():
    # 執行工單
    data = {
        "type": 1,
        "to_user": username,
        "id": id
    }
    data = json.dumps(data)
    headers = {
        "Content-Type": "application/json",
        "Authorization": "JWT" + " " + token
    }
    request = requests.put(api_url + 'audit_sql', data=data, headers=headers)
    return request.text


if __name__ == "__main__":
    api_url = "https://yearning.example.com/api/v1/"
    token = get_token()
    work_list = get_work()
    for v in work_list['data']:
        id = v['id']
        bundle_id = v['bundle_id']
        status = v['status']
        type = v['type']
        username = v['username']
        work_id = v['work_id']
        if status == 2:           # 工單狀態等於2則代表待執行
            sql = get_sql()['sql']
            sql = str.lower(sql)
            keyword = ["drop", "truncate", "grant", "lock"]
            for i in range(0, len(keyword)):
                key = keyword[i]
                if key in sql:
                    print("停止執行有敏感詞:" + key)
                    sys.exit()
            audit_sql_return = audit_sql()   #執行工單
            now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            log = now_time + ' ' + audit_sql_return + ' ' + work_id + ' "' + sql + '"'
            log_file = '/tmp/audit-sql.log'
            print(log)
            f = open(log_file, 'a')
            f.write(log + '\n')
            f.close()
            time.sleep(2)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM