前言
最近整理文件夾,發現了本科完成的這個題目。希望對大家有借鑒意義。
嚴格來講,本文可以實現對漏洞資產的管理。
- 將利用censys下載的bacnet協議資產(包括物聯網設備IP信息、運營商信息、版本信息、位置信息等)寫入ElasticSearch數據庫。並添加vul_name字段用於保存該設備的所有漏洞名稱,以便查詢某物聯網設備的所有漏洞詳情信息。寫入數據庫是為了Python與數據庫連接批量化訪問數據。
- 利用Python(Nessus API)對全部IP進行漏洞掃描得到所有plugin_id(需要去重處理),然后根據plugin_id得到漏洞總資產,最后將全部漏洞信息(漏洞介紹、漏洞名稱、危險等級、CVE編號、漏洞發布時間、漏洞發現時間、解決辦法,連同bacnet協議等)寫入ElasticSearch數據庫以便快速進行漏洞檢索。
數據管理
關於如何將json格式的數據寫入elasticsearch數據庫,請參考博主之前的文章Python-將json文件寫入ES數據庫
下圖為elasticsearch中部分數據截圖。
單個IP掃描
首先了解Nessus是如何掃描的,下圖可以看到掃描98.100.184.253用 時5分鍾,使用的策略是Basic Network Scan,使用的掃描器為Local Scaner, 掃描得到的信息數為7個,從右下角可知有兩個危險等級為medium的漏洞,點擊查看詳情(漏洞描述、解決方案、CVE編號、端口等)
循環掃描
發起掃描與獲取掃描結果都需要進行登錄,使用Nessus的用戶名和密碼及/session接口:POST /session(向/session接口發送POST請求,請求的payload參數作為用戶名和密碼)。若用戶名和密碼正確,/session接口會返回一個token,將token放入請求的頭信息中,請求中帶上頭信息即可以使用。
發起掃描的API接口為/scan/{scan_id}/launch,用POST方法進行調用,掃描的目標需要放在請求的payload中。掃描開始后,需要監聽掃描任務來確定任務是否成功發起、意外或終止,使用的API為GET/scan/{scan_id},根據API的返回值即可得到本次掃描的狀態,每個一段時間發送請求查詢任務狀態即可。
當監聽到任務結束時,就可以處理本次掃描結果。/scans/{scan_id}接口除了返回掃描任務的狀態之外,也返回了一個漏洞列表response[‘vulnerabilities’]和一個主機列表response[‘hosts’],即從這兩個列表獲取漏洞掃描的詳細結果。該部分內容由函數download完成。
vulnerabilities 字段返回的內容如下
"vulnerabilities" :
[ { "plugin_id": {integer},
"plugin_name ": {string} ,
" plugin_family ": {string},
"count" : {integer},
"vuln_index": {integer},
"severity_index " : {integer }]
其為本次漏洞掃描的一個總覽,包含了漏洞數目的統計和漏洞的基本信息,其中最為重要的是plugin_id,可以用於獲取漏洞詳情。用函數extract提取plugin_id。
漏洞信息詳情查詢的接口為GET /plugins/plugin/{id},該接口的返回值如下
{ "id " : { integer } , "name " : {string} , "family_name" : {string}, "attributes " : [{"attribute_name" : {string} , "attribute_value " : {string}}]}
該返回值中attributes字典列表可以用來表示漏洞各類信息,包括漏洞的CVE編號,危險等級,描述,詳情,解決方案等。用函數get_vul_detail獲取漏洞詳情。
詳細代碼如下:
# -*- coding: UTF-8 -*-
import requests
import json
import time
import sys
import os
import re
from elasticsearch import Elasticsearch
#對數據庫里所有IP掃描得到不重復的pluginID再根據pluginID得到漏洞詳細信息文件
requests.packages.urllib3.disable_warnings()
es = Elasticsearch()
url = 'https://localhost:8834'
verify = False
token = ''
username = 'zhang'#你的賬號
password = '**********'#你的密碼
para = {
"_source":"ip"
}
def build_url(resource):
return '{0}{1}'.format(url, resource)
def connect(method, resource, data=None):
"""
Send a request
Send a request to Nessus based on the specified data. If the session token
is available add it to the request. Specify the content type as JSON and
convert the data to JSON format.
"""
headers = {'X-Cookie': 'token={0}'.format(token),
'content-type': 'application/json'}
data = json.dumps(data)
if method == 'POST':
r = requests.post(build_url(resource), data=data, headers=headers, verify=verify)
elif method == 'PUT':
r = requests.put(build_url(resource), data=data, headers=headers, verify=verify)
elif method == 'DELETE':
r = requests.delete(build_url(resource), data=data, headers=headers, verify=verify)
else:
r = requests.get(build_url(resource), params=data, headers=headers, verify=verify)
# Exit if there is an error.
if r.status_code != 200:
e = r.json()
print e['error']
sys.exit()
# When downloading a scan we need the raw contents not the JSON data.
if 'download' in resource:
return r.content
else:
return r.json()
def login(usr, pwd):
"""
Login to nessus.
"""
login = {'username': usr, 'password': pwd}
data = connect('POST', '/session', data=login)
return data['token']
def logout():
"""
Logout of nessus.
"""
connect('DELETE', '/session')
def get_policies():
"""
Get scan policies
Get all of the scan policies but return only the title and the uuid of
each policy.
"""
data = connect('GET', '/editor/policy/templates')
return dict((p['title'], p['uuid']) for p in data['templates'])
def get_history_ids(sid):
"""
Get history ids
Create a dictionary of scan uuids and history ids so we can lookup the
history id by uuid.
"""
data = connect('GET', '/scans/{0}'.format(sid))
return dict((h['uuid'], h['history_id']) for h in data['history'])
def get_scan_history(sid, hid):
"""
Scan history details
Get the details of a particular run of a scan.
"""
params = {'history_id': hid}
data = connect('GET', '/scans/{0}'.format(sid), params)
return data['info']
def add(name, desc, targets, pid):
"""
Add a new scan
Create a new scan using the policy_id, name, description and targets. The
scan will be created in the default folder for the user. Return the id of
the newly created scan.
"""
scan = {'uuid': pid,
'settings': {
'name': name,
'description': desc,
'text_targets': targets}
}
data = connect('POST', '/scans', data=scan)
return data['scan']
def update(scan_id, name, desc, targets, pid=None):
"""
Update a scan
Update the name, description, targets, or policy of the specified scan. If
the name and description are not set, then the policy name and description
will be set to None after the update. In addition the targets value must
be set or you will get an "Invalid 'targets' field" error.
"""
scan = {}
scan['settings'] = {}
scan['settings']['name'] = name
scan['settings']['desc'] = desc
scan['settings']['text_targets'] = targets
if pid is not None:
scan['uuid'] = pid
data = connect('PUT', '/scans/{0}'.format(scan_id), data=scan)
return data
def launch(sid):
"""
Launch a scan
Launch the scan specified by the sid.
"""
data = connect('POST', '/scans/{0}/launch'.format(sid))
return data['scan_uuid']
def status(sid, hid):
"""
Check the status of a scan run
Get the historical information for the particular scan and hid. Return
the status if available. If not return unknown.
"""
d = get_scan_history(sid, hid)
return d['status']
def export_status(sid, fid):
"""
Check export status
Check to see if the export is ready for download.
"""
data = connect('GET', '/scans/{0}/export/{1}/status'.format(sid, fid))
return data['status'] == 'ready'
def export(sid, hid):
"""
Make an export request
Request an export of the scan results for the specified scan and
historical run. In this case the format is hard coded as nessus but the
format can be any one of nessus, html, pdf, csv, or db. Once the request
is made, we have to wait for the export to be ready.
"""
data = {'history_id': hid,
'format': 'nessus'}
data = connect('POST', '/scans/{0}/export'.format(sid), data=data)
fid = data['file']
while export_status(sid, fid) is False:
time.sleep(5)
return fid
def download(sid, fid):
"""
Download the scan results
Download the scan results stored in the export file specified by fid for
the scan specified by sid.
"""
data = connect('GET', '/scans/{0}/export/{1}/download'.format(sid, fid))
filename = 'nessus_{0}_{1}.nessus'.format(sid, fid)
print('Saving scan results to {0}.'.format(filename))
with open(filename, 'w') as f:
f.write(data)
return filename
def extract(file):
outfile = 'output.txt'
count = 0
lines_seen = set()
in_file = open(file, 'r')
out_file = open(outfile, 'a')
lines = in_file.readlines()
for line in lines:
if line not in lines_seen:
str_name = line.split(" ")[0]
str1 = 'pluginId'
if (str1 in str_name):
out_file.write(line)
count += 1
lines_seen.add(line)
in_file.close()
out_file.close()
os.remove(file)
return outfile
def get_vul_detail(file):
vul_detail = {
"cve_number": "",
"vul_name": "",
"vul_intro": "",
"vul_detail": "",
"vul_level": 0,
"solution":"",
"release_time": "",
"discover_time": ""
}
header = {'X-Cookie': 'token={0}'.format(token),
'content-type': 'application/json'}
endfile = 'endfile.txt'
id_file = open(file, 'r')
detail_file = open(endfile,'w')
end = open('end.txt','w+')
str1 = r'\''
str2 = r'"'
lines = id_file.readlines()
for line in lines:
m = re.findall(r'(\w*[0-9]+)\w*', line)
plugin_id = m[0]
url = 'https://localhost:8834/plugins/plugin/{plugin_id}'.format(plugin_id=plugin_id)
respone = requests.get(url, headers=header, verify=False)
if respone is not None:
result = json.loads(respone.text)
# 漏洞名稱
vul_detail['vul_name'] = str(result['name']).encode('utf-8')
# 遍歷attributes生成結果
for attr in result['attributes']:
attr_name = attr['attribute_name']
# cve編號
if attr_name == 'cve':
vul_detail['cve_number'] = str(attr['attribute_value']).encode('utf-8')
continue
# 漏洞描述
elif attr_name == 'synopsis':
vul_detail['vul_intro'] = str(attr['attribute_value']).encode('utf-8')
continue
# 漏洞詳情
elif attr_name == 'description':
vul_detail['vul_detail'] = str(attr['attribute_value']).encode('utf-8')
continue
# 漏洞等級
elif attr_name == 'risk_factor':
vul_detail['risk_factor'] = str(attr['attribute_value']).encode('utf-8')
continue
# 漏洞描述
elif attr_name == 'solution':
vul_detail['solution'] = str(attr['attribute_value']).encode('utf-8')
continue
# 漏洞發布時間
elif attr_name == 'plugin_publication_date':
vul_detail['release_time'] = str(attr['attribute_value']).encode('utf-8')
continue
# 漏洞發現時間
elif attr_name == 'vuln_publication_date':
vul_detail['discover_time'] = str(attr['attribute_value']).encode('utf-8')
continue
detail_file.write(str(vul_detail)+'\n')
detail_file.close()
detail_file = open(endfile, 'r')
for ss in detail_file.readlines():
tt = re.sub(str1, str2, ss)
end.write(tt)
id_file.close()
detail_file.close()
end.close()
os.remove(endfile)
#os.remove(file)
if __name__ == '__main__':
file_puginid = ''
print('Login')
token = login(username, password)
m = 0
flag = 0
array_search = es.search(index="indextest", doc_type="string", params=para, size=5, request_timeout=60)
jsons = array_search["hits"]["hits"]
s = []
for hits in jsons:
s.append(hits["_source"]["ip"])
for ip in s:
IPdone = open('IP.txt', 'a+')
lines = IPdone.readlines()
ss = str(ip).encode('utf-8')+'\n'
size = os.path.getsize('IP.txt')
if size == 0:
pass
else:
for line in lines:
if line == ss:
m = 1
else:
m = 0
IPdone.close()
if m == 0:
flag = 1
IPdone = open('IP.txt', 'a')
IPdone.write(ss)
IPdone.close()
# print('Adding new scan.')
policies = get_policies()
policy_id = policies['Basic Network Scan']
scan_data = add('Test Scan', 'Create a new scan with API', '192.168.1.1', policy_id)
scan_id = scan_data['id']
# print('Updating scan with new targets.')
update(scan_id, scan_data['name'], scan_data['description'], ip)
print(ip)
# print('Launching new scan.')
scan_uuid = launch(scan_id)
history_ids = get_history_ids(scan_id)
history_id = history_ids[scan_uuid]
while status(scan_id, history_id) != 'completed':
time.sleep(5)
# print('Exporting the completed scan.')
file_id = export(scan_id, history_id)
filename = download(scan_id, file_id)
file_puginid = extract(filename)
if flag == 1:
get_vul_detail(file_puginid)
主函數解釋如下:
- 連接ElasticSearch數據庫,並從庫中獲取所有IP(可以不用一次獲取所有IP,只需修改size大小即可,若size = 5即表示一次掃描數據庫中前5個IP),用一個list保存,並實現了對掃描過的IP不進行重復掃描的功能(由於每次掃描都是從前往后開始,故用IP.txt保存掃描過的IP以判斷是否掃描過該IP)。
- 循環對每一個IP發起一次掃描,policy為Basic Network Scan,scaner為Local Scaner,用download函數及extract函數獲取plugin_id(此處為獲取漏洞總資產故進行了去重處理),並根據plugin_id用get_vul_detail函數(根據attributes字典列表)獲取漏洞詳情信息。
最后得到end.txt文本文件,存放的是掃描結果。
將end.txt另存為json格式,再將結果寫回數據庫。
漏洞分析
數據庫中部分漏洞資產如下圖
以第四個漏洞為例作簡單的分析:
- 漏洞描述:Coils from a Modicon field device, such as a PLC, RTU, or IED, can be read using function code 1.
- 漏洞名稱:Modbus/TCP Coil Access
- 漏洞等級:Medium
- 漏洞發布時間:2006/12/11
- CVE編號:CVE-2000-1200
- 解決方案:Restrict access to the Modbus port (TCP/502) to authorized Modbus clients.
- 漏洞詳情:Using function code 1, Modbus can reads the coils in a Modbus slave, which is commonly used by SCADA and DCS field devices. Coils refer to the binary output settings and are typically mapped to actuators. A sample of coil settings read from the device are provided by the plugin output. The ability to read coils may help an attacker profile a system and identify ranges of registers to alter via a write coil message.
即ModBus網絡是一個工業通信系統,由帶智能終端的可編程序控制器和計算機通過公用線路或局部專用線路連接而成。其系統結構既包括硬件、亦包括軟件。它可應用於各種數據采集和過程監控。該漏洞CVE編號為CVE-2000-1200,名稱為Modbus/TCP線圈接入,是指來自Modicon現場設備(如PLC、RTU或IED)的線圈可以使用功能代碼1讀取。可通過將對Modbus端口(TCP/502)的訪問限制為授權的Modbus客戶端來解決。使用功能代碼1,Modbus可以讀取Modbus從站中的線圈,這是SCADA和DCS現場設備常用的。線圈是指二進制輸出設置,通常映射到執行器。插件輸出提供了從設備讀取的線圈設置示例。讀取線圈的能力可能有助於攻擊者分析系統並通過寫入線圈消息識別要更改的寄存器范圍。
總結
利用自動化掃描,可以得到大量物聯網設備與漏洞的對應關系。即可以通過數據庫查詢得到所有物聯網設備基本信息及其所有漏洞信息,也可以通過數據庫查詢得知哪些物聯網設備有相同的漏洞。便於研究人員快速檢索分析。實現這二者的對應關系之后,還可以進行漏洞預警和資產分布統計。當新的漏洞被曝光之后,可以利用數據庫中的信息快速統計分析,找到全球全國受漏洞影響的資產分布,以便快速應對。
