使用Python調用Nessus 接口實現自動化掃描


@

之前在項目中需要接入nessus掃描器,研究了一下nessus的api,現在將自己的成果分享出來。
Nessus提供了豐富的二次開發接口,無論是接入其他系統還是自己實現自動化掃描,都十分方便。
同時Nessus也提供了完備的API文檔,可以在 Settings->My Account->API Keys->API documentation
nessus圖片

認證

nessus提供兩種認證方式,第一種采用常規的登錄后獲取token的方式,在https://localhost:8834/api#/resources/session條目中可以找到這種方式,它的接口定義如下:

POST /session

{
	"username":{string},
	"password":{string}
}

輸入正確的用戶名和密碼,登錄成功后會返回一個token
···
{
"token": {string}
}
···

在后續請求中,將token放入請求頭信息中請求頭的key為X-Cookie,值為 token=xxxx,例如 :X-Cookie: token=5fa3d3fd97edcf40a41bb4dbdfd0b470ba45dde04ebc37f8;,下面是獲取任務列表的例子

import requests
import json
def get_token(ip, port, username, password):
    url = "https://{0}:{1}/session".format(ip, port)
    post_data = {
        'username': username,
        'password': password
    }
    
    respon = requests.post(url, data=post_data, verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["token"]

def get_scan_list()
    # 這里ip和port可以從配置文件中讀取或者從數據庫讀取,這里我省略了獲取這些配置值得操作
    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            "X-Cookie":"token={0}".format(token),
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result

第二種方式是使用Nessus生成的API Key,這里我們可以依次點擊 Settings->My Account->API Keys-->Generate按鈕,生成一個key,后續使用時填入頭信息中,還是以獲取掃描任務列表作為例子

def get_scan_list()
    accessKey = "XXXXXX" #此處填入真實的內容
    secretKey = "XXXXXX" #此處填入真實內容
    
    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accessKey, secretkey=secretKey)
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result

對比來看使用第二種明顯方便一些,因此后續例子都采用第二種方式來呈現

策略模板配置

策略模板的接口文檔在 https://localhost:8834/api#/resources/policies 中。

創建策略模板

創建策略模板使用的是 策略模板的create接口,它里面有一個必須填寫的參數 uuid 這個參數是一個uuid值,表示以哪種現有模板進行創建。在創建之前需要先獲取系統中可用的模板。獲取的接口是 /editor/{type}/templates,type 可以選填policy或者scan。這里我們填policy

一般我們都是使用模板中的 Advanced 來創建,如下圖
在這里插入圖片描述
下面是獲取該模板uuid的方法,主要思路是獲取系統中所有模板,然后根據模板名稱返回對應的uuid值

def get_nessus_template_uuid(ip, port, template_name = "advanced"):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/editor/scan/templates".format(ip=ip, port=port)
    response = requests.get(api, headers=header, verify=False)
    templates = json.loads(response.text)['templates']

    for template in templates:
        if template['name'] == template_name:
            return template['uuid']
    return None

有了這個id之后,下面來創建策略模板,這個接口的參數較多,但是很多參數都是選填項。

這個部分文檔寫的很簡陋,很多參數不知道是干嘛用的,當時我為了搞清楚每個參數的作用,一個個的嘗試,然后去界面上看它的效果,最后終於把我感興趣的給弄明白了。
它的主體部分如下:

{
   "uuid": {template_uuid},
   "audits": {
       "feed": {
           "add": [
               {
                   "id": {audit_id},
                   "variables": {
                       "1": {audit_variable_value},
                       "2": {audit_variable_value},
                       "3": {audit_variable_value}
                   }
               }
           ]
       }
   },
   "credentials": {
       "add": {
           {credential_category}: {
               {credential_name}: [
                   {
                       {credential_input_name}: {string}
                   }
               ]
           }
       }
   },
   "plugins": {
       {plugin_family_name}: {
           "status": {string},
           "individual": {
               {plugin_id}: {string}
           }
       }
   },
   "scap": {
       "add": {
           {scap_category}: [
               {
                   {scap_input_name}: {string}
               }
           ]
       }
   },
   "settings": {
       "acls": [
           permission Resource
       ],
       //其他的減值對,這里我將他們都省略了
}

他們與界面上配置的幾個大項有對應關系,能對應的上的我給做了標記,但是有的部分對應不上。
在這里插入圖片描述
settings 是給策略模板做基礎配置的,包括配置掃描的端口范圍,服務檢測范圍等等。
credentials 是配置登錄掃描的,主要包括 windows、ssh、telnet等等
plugins 配置掃描使用的插件,例如服務掃描版本漏洞等等

在settings中,對應關系如下圖所示
host scan
port scan
在這里插入圖片描述
下面是創建掃描策略模板的實際例子:

def create_template(ip, port, **kwargs): # kwargs 作為可選參數,用來配置settings和其他項
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }
    policys = {}

	# 這里 grouppolicy_set 存儲的是策略模板中各個腳本名稱以及腳本是否啟用的信息
    for policy in grouppolicy_set:
        enabled = "enabled" if policy.enable else "disabled"
        policys[policy.name] = {
            "status": enabled
        }
    
    # settings里面的各小項必須得帶上,否則會創建不成功
    "settings": {
        "name": template.name,
        "watchguard_offline_configs": "",
        "unixfileanalysis_disable_xdev": "no",
        "unixfileanalysis_include_paths": "",
        "unixfileanalysis_exclude_paths": "",
        "unixfileanalysis_file_extensions": "",
        "unixfileanalysis_max_size": "",
        "unixfileanalysis_max_cumulative_size": "",
        "unixfileanalysis_max_depth": "",
        "unix_docker_scan_scope": "host",
        "sonicos_offline_configs": "",
        "netapp_offline_configs": "",
        "junos_offline_configs": "",
        "huawei_offline_configs": "",
        "procurve_offline_configs": "",
        "procurve_config_to_audit": "Saved/(show config)",
        "fortios_offline_configs": "",
        "fireeye_offline_configs": "",
        "extremeos_offline_configs": "",
        "dell_f10_offline_configs": "",
        "cisco_offline_configs": "",
        "cisco_config_to_audit": "Saved/(show config)",
        "checkpoint_gaia_offline_configs": "",
        "brocade_offline_configs": "",
        "bluecoat_proxysg_offline_configs": "",
        "arista_offline_configs": "",
        "alcatel_timos_offline_configs": "",
        "adtran_aos_offline_configs": "",
        "patch_audit_over_telnet": "no",
        "patch_audit_over_rsh": "no",
        "patch_audit_over_rexec": "no",
        "snmp_port": "161",
        "additional_snmp_port1": "161",
        "additional_snmp_port2": "161",
        "additional_snmp_port3": "161",
        "http_login_method": "POST",
        "http_reauth_delay": "",
        "http_login_max_redir": "0",
        "http_login_invert_auth_regex": "no",
        "http_login_auth_regex_on_headers": "no",
        "http_login_auth_regex_nocase": "no",
        "never_send_win_creds_in_the_clear": "yes" if kwargs["never_send_win_creds_in_the_clear"] else "no",
        "dont_use_ntlmv1": "yes" if kwargs["dont_use_ntlmv1"] else "no",
        "start_remote_registry": "yes" if kwargs["start_remote_registry"] else "no",
        "enable_admin_shares": "yes" if kwargs["enable_admin_shares"] else "no",
        "ssh_known_hosts": "",
        "ssh_port": kwargs["ssh_port"],
        "ssh_client_banner": "OpenSSH_5.0",
        "attempt_least_privilege": "no",
        "region_dfw_pref_name": "yes",
        "region_ord_pref_name": "yes",
        "region_iad_pref_name": "yes",
        "region_lon_pref_name": "yes",
        "region_syd_pref_name": "yes",
        "region_hkg_pref_name": "yes",
        "microsoft_azure_subscriptions_ids": "",
        "aws_ui_region_type": "Rest of the World",
        "aws_us_east_1": "",
        "aws_us_east_2": "",
        "aws_us_west_1": "",
        "aws_us_west_2": "",
        "aws_ca_central_1": "",
        "aws_eu_west_1": "",
        "aws_eu_west_2": "",
        "aws_eu_west_3": "",
        "aws_eu_central_1": "",
        "aws_eu_north_1": "",
        "aws_ap_east_1": "",
        "aws_ap_northeast_1": "",
        "aws_ap_northeast_2": "",
        "aws_ap_northeast_3": "",
        "aws_ap_southeast_1": "",
        "aws_ap_southeast_2": "",
        "aws_ap_south_1": "",
        "aws_me_south_1": "",
        "aws_sa_east_1": "",
        "aws_use_https": "yes",
        "aws_verify_ssl": "yes",
        "log_whole_attack": "no",
        "enable_plugin_debugging": "no",
        "audit_trail": "use_scanner_default",
        "include_kb": "use_scanner_default",
        "enable_plugin_list": "no",
        "custom_find_filepath_exclusions": "",
        "custom_find_filesystem_exclusions": "",
        "reduce_connections_on_congestion": "no",
        "network_receive_timeout": "5",
        "max_checks_per_host": "5",
        "max_hosts_per_scan": "100",
        "max_simult_tcp_sessions_per_host": "",
        "max_simult_tcp_sessions_per_scan": "",
        "safe_checks": "yes",
        "stop_scan_on_disconnect": "no",
        "slice_network_addresses": "no",
        "allow_post_scan_editing": "yes",
        "reverse_lookup": "no",
        "log_live_hosts": "no",
        "display_unreachable_hosts": "no",
        "report_verbosity": "Normal",
        "report_superseded_patches": "yes",
        "silent_dependencies": "yes",
        "scan_malware": "no",
        "samr_enumeration": "yes",
        "adsi_query": "yes",
        "wmi_query": "yes",
        "rid_brute_forcing": "no",
        "request_windows_domain_info": "no",
        "scan_webapps": "no",
        "start_cotp_tsap": "8",
        "stop_cotp_tsap": "8",
        "modbus_start_reg": "0",
        "modbus_end_reg": "16",
        "hydra_always_enable": "yes" if kwargs["hydra_always_enable"] else "no",
        "hydra_logins_file": "" if kwargs["hydra_logins_file"] else kwargs["hydra_logins_file"], # 弱口令文件需要事先上傳,后面會提到上傳文件接口
        "hydra_passwords_file": "" if kwargs["hydra_passwords_file"] else kwargs["hydra_passwords_file"],
        "hydra_parallel_tasks": "16",
        "hydra_timeout": "30",
        "hydra_empty_passwords": "yes",
        "hydra_login_as_pw": "yes",
        "hydra_exit_on_success": "no",
        "hydra_add_other_accounts": "yes",
        "hydra_postgresql_db_name": "",
        "hydra_client_id": "",
        "hydra_win_account_type": "Local accounts",
        "hydra_win_pw_as_hash": "no",
        "hydra_cisco_logon_pw": "",
        "hydra_web_page": "",
        "hydra_proxy_test_site": "",
        "hydra_ldap_dn": "",
        "test_default_oracle_accounts": "no",
        "provided_creds_only": "yes",
        "smtp_domain": "example.com",
        "smtp_from": "nobody@example.com",
        "smtp_to": "postmaster@[AUTO_REPLACED_IP]",
        "av_grace_period": "0",
        "report_paranoia": "Normal",
        "thorough_tests": "no",
        "detect_ssl": "yes",
        "tcp_scanner": "no",
        "tcp_firewall_detection": "Automatic (normal)",
        "syn_scanner": "yes",
        "syn_firewall_detection": "Automatic (normal)",
        "wol_mac_addresses": "",
        "wol_wait_time": "5",
        "scan_network_printers": "no",
        "scan_netware_hosts": "no",
        "scan_ot_devices": "no",
        "ping_the_remote_host": "yes",
        "tcp_ping": "yes",
        "icmp_unreach_means_host_down": "no",
        "test_local_nessus_host": "yes",
        "fast_network_discovery": "no",

        "arp_ping": "yes" if kwargs["arp_ping"] else "no",
        "tcp_ping_dest_ports": kwargs["tcp_ping_dest_ports"],
        "icmp_ping": "yes" if kwargs["icmp_ping"] else "no",
        "icmp_ping_retries": kwargs["icmp_ping_retries"],
        "udp_ping": "yes" if kwargs["udp_ping"] else "no",
        "unscanned_closed": "yes" if kwargs["unscanned_closed"] else "no",
        "portscan_range": kwargs["portscan_range"],
        "ssh_netstat_scanner": "yes" if kwargs["ssh_netstat_scanner"] else "no",
        "wmi_netstat_scanner": "yes" if kwargs["wmi_netstat_scanner"] else "no",
        "snmp_scanner": "yes" if kwargs["snmp_scanner"] else "no",
        "only_portscan_if_enum_failed": "yes" if kwargs["only_portscan_if_enum_failed"] else "no",
        "verify_open_ports": "yes" if kwargs["verify_open_ports"] else "no",
        "udp_scanner": "yes" if kwargs["udp_scanner"] else "no",
        "svc_detection_on_all_ports": "yes" if kwargs["svc_detection_on_all_ports"] else "no",
        "ssl_prob_ports": "Known SSL ports" if kwargs["ssl_prob_ports"] else "All ports",
        "cert_expiry_warning_days": kwargs["cert_expiry_warning_days"],
        "enumerate_all_ciphers": "yes" if kwargs["enumerate_all_ciphers"] else "no",
        "check_crl": "yes" if kwargs["check_crl"] else "no",
   }
	
	credentials = {
            "add": {
                "Host": {
                    "SSH": [],
                    "SNMPv3": [],
                    "Windows": [],
                },
                "Plaintext Authentication": {
                    "telnet/rsh/rexec": []
                }
            }
        }
        try:
            if kwargs["snmpv3_username"] and kwargs["snmpv3_port"] and kwargs["snmpv3_level"]:
                level = kwargs["snmpv3_level"]
                if level == NessusSettings.LOW:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "No authentication and no privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"]
                    })
                elif level == NessusSettings.MID:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication without privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"][1]],
                        "auth_password": kwargs["snmpv3_auth_psd"]
                    })
                elif level == NessusSettings.HIGH:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication and privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"]][1],
                        "auth_password": kwargs["snmpv3_auth_psd"],
                        "privacy_algorithm": NessusSettings.PPIVACY_ALG[kwargs["snmpv3_hide"]][1],
                        "privacy_password": kwargs["snmpv3_hide_psd"]
                    })

            if kwargs["ssh_username"] and kwargs["ssh_psd"]:
                credentials["add"]["Host"]["SSH"].append(
                    {
                        "auth_method": "password",
                        "username": kwargs["ssh_username"],
                        "password": kwargs["ssh_psd"],
                        "elevate_privileges_with": "Nothing",
                        "custom_password_prompt": "",
                    })

            if kwargs["windows_username"] and kwargs["windows_psd"]:
                credentials["add"]["Host"]["Windows"].append({
                    "auth_method": "Password",
                    "username": kwargs["windows_username"],
                    "password": kwargs["windows_psd"],
                    "domain": kwargs["ssh_host"]
                })

            if kwargs["telnet_username"] and kwargs["telnet_password"]:
                credentials["add"]["Plaintext Authentication"]["telnet/rsh/rexec"].append({
                    "username": kwargs["telnet_username"],
                    "password": kwargs["telnet_password"]
                })
    
    data = {
            "uuid": get_nessus_template_uuid(terminal, "advanced"),
            "settings": settings,
            "plugins": policys,
            "credentials": credentials
        }
    
    api = "https://{0}:{1}/policies".format(ip, port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"), # 這里做一個轉碼防止在nessus端發生中文亂碼
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["policy_id"] # 返回策略模板的id,后續可以在創建任務時使用
    else:
        return None

策略還有copy、delete、config等操作,這里就不再介紹了,這個部分主要弄清楚各參數的作用,后面的這些接口使用的參數都是一樣的

任務

任務部分的API 在https://localhost:8834/api#/resources/scans

創建任務

創建任務重要的參數如下說明如下:

  1. uuid: 創建任務時使用的模板id,這個id同樣是我們上面說的系統自帶的模板id
  2. name:任務名稱
  3. policy_id:策略模板ID,這個是可選的,如果要使用上面我們自己定義的掃描模板,需要使用這個參數來指定,並且設置上面的uuid為 custom 的uuid,這個值表示使用用戶自定義模板;當然如果就想使用系統提供的,這個字段可以不填
  4. text_targets:掃描目標地址,這個參數是一個數組,可以填入多個目標地址,用來一次掃描多個主機

創建任務的例子如下:

def create_task(task_name, policy_id, hosts): # host 是一個列表,存放的是需要掃描的多台主機
    uuid = get_nessus_template_uuid(terminal, "custom") # 獲取自定義策略的uuid
    if uuid is None:
        return False

    data = {"uuid": uuid, "settings": {
        "name": name,
        "policy_id": policy_id,
        "enabled": True,
        "text_targets": hosts,
        "agent_group_id": []
    }}

    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/scans".format(ip=terminal.ip, port=terminal.port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"),
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        if data["scan"] is not None:
            scan = data["scan"]
            # 新增任務擴展信息記錄

            return scan["id"] # 返回任務id

啟動/停止任務

啟動任務的接口為 POST /scans/{scan_id}/launch scan_id 是上面創建任務返回的任務ID, 它有個可選參數 alt_targets,如果這個參數被指定,那么該任務可以掃描這個參數中指定的主機,而之前創建任務時指定的主機將被替代

停止任務的接口為: POST /scans/{scan_id}/stop
下面給出啟動和停止任務的方法

def start_task(task_id, hosts):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    data = {
        "alt_targets": [hosts] # 重新指定掃描地址
    }

    api = "https://{ip}:{port}/scans/{scan_id}/launch".format(ip=ip, port=port, scan_id=scan_id)
    response = requests.post(api, data=data, verify=False, headers=header)
    if response.status_code != 200:
        return False
    else:
        return True

def stop_task(task_id):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=terminal.reserved1,
                                                                          secretkey=terminal.reserved2),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}
    
    api = "https://{ip}:{port}/scans/{scan_id}/stop".format(ip=ip, port=port, task_id)
    response = requests.post(api, headers=header, verify=False)
    if response.status_code == 200 or response.status_code == 409: # 根據nessus api文檔可以知道409 表示任務已結束
        return True

    return False

獲取掃描結果

使用接口 GET /scans/{scan_id} 可以獲取最近一次掃描的任務信息,從接口文檔上看,它還可以獲取某次歷史掃描記錄的信息,如果不填這個參數,接口中會返回所有歷史記錄的id。如果不填歷史記錄id,那么會返回最近一次掃描到的漏洞信息,也就是說新掃描到的信息會把之前的信息給覆蓋

下面是返回信息的部分說明

{
    "info": {
        "edit_allowed": {boolean},
        "status": {string}, //當前狀態 completed 字符串表示結束,cancel表示停止
        "policy": {string},
        "pci-can-upload": {boolean},
        "hasaudittrail": {boolean},
        "scan_start": {string},
        "folder_id": {integer},
        "targets": {string},
        "timestamp": {integer},
        "object_id": {integer},
        "scanner_name": {string},
        "haskb": {boolean},
        "uuid": {string},
        "hostcount": {integer},
        "scan_end": {string},
        "name": {string},
        "user_permissions": {integer},
        "control": {boolean}
    },
    "hosts": [ //按主機區分的漏洞信息
        host Resource
    ],
    "comphosts": [
        host Resource
    ],
    "notes": [
        note Resource
    ],
    "remediations": {
        "remediations": [
            remediation Resource
        ],
        "num_hosts": {integer},
        "num_cves": {integer},
        "num_impacted_hosts": {integer},
        "num_remediated_cves": {integer}
    },
    "vulnerabilities": [
        vulnerability Resource //本次任務掃描到的漏洞信息
    ],
    "compliance": [
        vulnerability Resource
    ],
    "history": [
        history Resource //歷史掃描信息,可以從這個信息中獲取歷史記錄的id
    ],
    "filters": [
        filter Resource
    ]
}

這個信息里面vulnerabilities和host里面都可以拿到漏洞信息,但是 vulnerabilities中是掃描到的所有漏洞信息,而host則需要根據id再次提交請求,也就是需要額外一次請求,但它是按照主機對掃描到的漏洞進行了分類。而使用vulnerabilities則需要根據漏洞信息中的host_id 手工進行分類

下面是獲取任務狀態的示例:

def get_task_status(task_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{task_id}".format(ip=ip, port=port,
                                                       task_id=task_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    hosts = data["hosts"]
    for host in hosts:
        get_host_vulnerabilities(scan_id, host["host_id"]) # 按主機獲取漏洞信息

    if data["info"]["status"] == "completed" or data["info"]["status"] =='canceled':
        # 已完成,此時更新本地任務狀態
    return 1, "OK"

獲取漏洞信息

在獲取任務信息中,已經得到了本次掃描中發現的弱點信息了,只需要我們解析這個json。它具體的內容如下:

"host_id": {integer}, //主機id
"host_index": {string}, 
"hostname": {integer},//主機名稱
"progress": {string}, //掃描進度
"critical": {integer}, //危急漏洞數
"high": {integer}, //高危漏洞數
"medium": {integer}, //中危漏洞數
"low": {integer}, //低危漏洞數
"info": {integer}, //相關信息數目
"totalchecksconsidered": {integer},
"numchecksconsidered": {integer},
"scanprogresstotal": {integer},
"scanprogresscurrent": {integer},
"score": {integer}

根據主機ID可以使用 GET /scans/{scan_id}/hosts/{host_id} 接口獲取主機信息,它需要兩個參數,一個是掃描任務id,另一個是主機id。
下面列舉出來的是返回值得部分內容,只列舉了我們感興趣的部分:

{
    "info": {
        "host_start": {string},
        "mac-address": {string},
        "host-fqdn": {string},
        "host_end": {string},
        "operating-system": {string},
        "host-ip": {string}
    },
    
    "vulnerabilities": [
        {
		    "host_id": {integer}, //主機id
		    "hostname": {string}, //主機名稱
		    "plugin_id": {integer}, //策略id
		    "plugin_name": {string}, //策略名稱
		    "plugin_family": {string}, //所屬策略組
		    "count": {integer}, //該種漏洞數
		    "vuln_index": {integer}, 
		    "severity_index": {integer},
		    "severity": {integer}
		}
    ],
}

根據上面獲取任務信息中得到的主機id和任務id,我們可以實現這個功能

def get_host_vulnerabilities(scan_id, host_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    scan_history = ScanHistory.objects.get(id=scan_id)
    api = "https://{ip}:{port}/scans/{task_id}/hosts/{host_id}".format(ip=ip, port=port, task_id=scan_id, host_id=host_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    vulns = data["vulnerabilities"]
    for vuln in vulns:
        vuln_name = vuln["plugin_name"]
        plugin_id = vuln["plugin_id"] #插件id,可以獲取更詳細信息,包括插件自身信息和掃描到漏洞的解決方案等信息
        #保存漏洞信息

獲取漏洞輸出信息與漏洞知識庫信息

我們在nessus web頁面中可以看到每條被檢測到的漏洞在展示時會有輸出信息和知識庫信息,這些信息也可以根據接口來獲取
在這里插入圖片描述
獲取漏洞的知識庫可以通過接口 GET /scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id} , 它的路徑為: https://localhost:8834/api#/resources/scans/plugin-output

它返回的值如下:

{
    "info": {
        "plugindescription": {
            "severity": {integer}, //危險等級,從info到最后的critical依次為1,2,3,4,5
            "pluginname": {string}, 
            "pluginattributes": {
                "risk_information": {
                    "risk_factor": {string}
                },
                "plugin_name": {string}, //插件名稱
                "plugin_information": {
                    "plugin_id": {integer},
                    "plugin_type": {string},
                    "plugin_family": {string},
                    "plugin_modification_date": {string}
                },
                "solution": {string}, //漏洞解決方案
                "fname": {string},
                "synopsis": {string},
                "description": {string} //漏洞描述
            },
            "pluginfamily": {string},
            "pluginid": {integer}
        }
    },
    "output": [
        plugin_output:{
           "plugin_output": {string}, //輸出信息
		   "hosts": {string}, //主機信息
		   "severity": {integer}, 
		   "ports": {} //端口信息
        }
    ]
}

有了這些信息,我們可以通過下面的代碼獲取這些信息:

def get_vuln_detail(scan_id, host_id, plugin_id)
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}".format(ip=ip, port=port, scan_id=scan_id, host_id=host_id, plugin_id=plugin_id)
    response = requests.get(api, headers=header, verify=False)
    data = json.loads(response.text)
    outputs = data["outputs"]
    return outputs

最后總結

這篇文章我們主要介紹了nessus API從掃描設置到掃描任務創建、啟動、停止、以及結果的獲取的內容,當然nessus的api不止這些,但是最重要的應該是這些,如果能幫助各位解決手頭上的問題自然是極好的,如果不能或者說各位朋友需要更細致的控制,可以使用瀏覽器抓包的方式來分析它的請求和響應包。
在摸索時最好的兩個幫手是瀏覽器 F12工具欄中的 network和nessus api文檔頁面上的test工具了。

我們可以先按 f12 打開工具並切換到network,然后在頁面上執行相關操作,觀察發包即可發現該如何使用這些API,因為Nessus Web端在操作時也是使用API。如下圖:
在這里插入圖片描述
或者可以使用文檔中的test工具,例如下面是測試 獲取插件輸出信息的接口
在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM