一、選題背景
Qa:
我為什么要選擇此選題呢?
目前開源代碼很少這類型的工具,為了方便自己在信息收集方面的便利。所以才選擇此題作為期末作業,同時也提升自己的水平、編程思維。
達到什么預期目標?
便於自己信息信息收集,以及對網站的狀態檢測。
項目背景
fofa網絡空間引擎介紹:
FOFA是白帽匯推出的一款網絡空間資產搜索引擎。它能夠幫助用戶迅速進行網絡資產匹配、加快后續工作進程。例如進行漏洞影響范圍分析、應用分布統計、應用流行度排名統計等。
簡單理解就是,一個本土加強版shodan,知道某產品在互聯網的部署情況、獲取一個根域名所有子域名網站、根據IP確認企業、根據一個子域名網站找到跟他在一個IP的其他網站、全網漏洞掃描、一個新的漏洞全網的。
二、主題式網絡爬蟲設計方案
- 名稱:基於爬蟲進行url采集檢測
- 內容:爬取fofa網絡空間引擎搜索的內容,然后進行網站狀態的檢測。
- 設計方案描述:
描述:本工具分為三部分進行設計,url采集、url格式修改、網站狀態檢測。
思路:首先使用json,接口進行數據處理、requests進行網頁爬取,其次使用open()函數進行url 的處理,最后使用。requests.status進行狀態的檢測。
技術難點:文件處理、url爬取篩選內容、url格式處理、線程。
三、主頁體面的結構特征分析
首頁是一個正常的搜索引擎界面:
從結構特征:
結構分析:
搜索框:
網頁左側顯示部分也是div標簽:
右側整體顯示部分div標簽:
所爬取的內容:
定位目標信息:
節點標簽查找:
soup.find(name="input", attrs={"id": "total_entries"})
soup.find_all(name="div", attrs={"class": "list_mod"})
PS:查找結點這次我使用的是接口然后處理json。
四、網絡爬蟲程序分析
結構:
url采集:
配置文件文件名:fofaConfig.yaml
1 email: #用戶名郵箱 2 key: #用戶名的key 3 4 Search: 'app="泛微-協同辦公OA" && country="CN"&&is_domain="true"' #fofa語法搜索檢測的內容
文件名:fofa.py
1 # encoding: utf-8 2 import json 3 import requests 4 from requests.packages.urllib3.exceptions import InsecureRequestWarning 5 import base64 6 import yaml 7 8 # 忽略https安全請求警告 9 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 10 # 結果 11 result = [] 12 13 14 # 獲取配置信息 15 def getFofaconfig(): 16 file = open("fofaConfig.yaml", 'r', encoding='utf-8') 17 file_date = file.read() # 配置文件數據保存到變量中 18 file.close() 19 # 處理配置文件 20 Userdata = yaml.load(file_date, Loader=yaml.FullLoader) 21 return Userdata 22 23 24 def IptoHostname(result): 25 url =[] 26 j = 1 27 for i in result['results']: 28 url.append(str(i[1]+':'+i[2])) 29 print(url) 30 31 32 33 def getTarget(): 34 useremail = str(getFofaconfig().get('email')) 35 userkey = str(getFofaconfig().get('key')) 36 # fofa獲取站點 37 Search = getFofaconfig().get('Search') 38 select = base64.b64encode(Search.encode('UTF-8')) 39 select = str(select, 'UTF-8') 40 fofa_url = "https://fofa.so/api/v1/search/all?email=" + useremail + "&key=" + userkey + "&qbase64=" + select 41 #寫入數據 42 try: 43 res = requests.get(fofa_url) 44 result = json.loads(res.text) 45 IptoHostname(result) 46 count = 0 47 with open('ip.txt', 'w') as targets: 48 for i in result['results']: 49 targets.write(i[1] + ':' + i[2] + '\n') 50 print(i[1]) 51 count += 1 52 print("搜索結果有" + str(count) + "條,已保存") 53 54 55 except Exception as e: 56 print(e) 57 58 if __name__ == '__main__': 59 getTarget()
運行截圖:
結果:
url格式處理:
文件名:alter_url.py
1 with open('ip.txt', 'r') as f: 2 urls_data = [data.strip().strip('\\') for data in f] 3 4 # 添加http://頭 5 def add_http(): 6 for itme in urls_data: 7 a = 'http://' + itme 8 # print(a) 9 10 with open('url.txt','a') as url: 11 url.write(a + '\n') 12 pass 13 14 # 在url后添加端口 15 # def add_port(): 16 # # 常用端口修改 17 # port_dict={1080,1883,3306,443,50070,5222,7001,80,8080,8081,81,8443,8686,9000,9092,9200,9999,53,9100} 18 # for itme in port_dict: 19 # for i in urls_data: 20 # with open('test.txt','a') as url: 21 # url.write(i +':'+str(itme) + '\n') 22 # print(i +':'+str(itme)) 23 24 # 根據需求功能進行添加 25 if __name__ == '__main__': 26 add_http() 27 # add_port()
運行截圖:
結果:
url狀態檢測:
文件名:url_status.py
1 # -*- coding:utf-8 -*- 2 import re 3 import io 4 import requests 5 import urllib3 6 import logging 7 from concurrent.futures import ThreadPoolExecutor 8 import time 9 import threading 10 from requests.packages.urllib3.exceptions import InsecureRequestWarning 11 12 logging.captureWarnings(True) 13 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 14 15 start = time.time() 16 lock = threading.Lock() 17 18 f = open("result.csv", "a") 19 f.write("源地址" + "," + "跳轉地址" + "," + "狀態碼" + "," + "標題" + '\n') 20 f = f.close() 21 22 # 獲取url列表 23 with open('url.txt', 'r') as f: 24 urls_data = [data.strip().strip('\\') for data in f] 25 26 print(urls_data) 27 # 獲取狀態碼、標題 28 def get_title(url, timeout=5): 29 header = { 30 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 31 } 32 try: 33 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 34 code = res.status_code 35 except Exception as error: 36 code = "無法訪問" 37 38 with lock: 39 code1 = str(code) 40 if code1 != "無法訪問": 41 try: 42 urllib3.disable_warnings() 43 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 44 res.encoding = res.apparent_encoding 45 title = re.findall("(?<=\<title\>)(?:.|\n)+?(?=\<)", res.text, re.IGNORECASE)[0].strip() 46 except: 47 title = "[ ]" 48 print(url + "," + res.url + "," + code1 + "," + title) 49 with io.open("result.csv", "a", encoding='utf-8') as f2: 50 f2.writelines(url + "," + res.url + "," + code1 + "," + title + '\n') 51 52 else: 53 title = " " 54 print(url + "," + " " + "," + code1 + "," + title) 55 with open("result.csv", "a") as f2: 56 f2.writelines(url + "," + " " + "," + code1 + "," + title + '\n') 57 58 59 # 多線程 60 with ThreadPoolExecutor(max_workers=50) as executor: 61 for urls in urls_data: 62 executor.submit( 63 get_title, url=urls 64 ) 65 66 end = time.time() 67 print("總耗時:", end - start, "秒")
運行截圖:
結果:
總代碼:
1 import json 2 import requests 3 from requests.packages.urllib3.exceptions import InsecureRequestWarning 4 import base64 5 import yaml 6 # encoding: utf-8 7 8 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 9 10 result = [] 11 12 13 14 def getFofaconfig(): 15 file = open("fofaConfig.yaml", 'r', encoding='utf-8') 16 file_date = file.read() 17 file.close() 18 19 Userdata = yaml.load(file_date, Loader=yaml.FullLoader) 20 return Userdata 21 22 23 def IptoHostname(result): 24 url =[] 25 j = 1 26 for i in result['results']: 27 url.append(str(i[1]+':'+i[2])) 28 print(url) 29 30 31 32 def getTarget(): 33 useremail = str(getFofaconfig().get('email')) 34 userkey = str(getFofaconfig().get('key')) 35 36 Search = getFofaconfig().get('Search') 37 select = base64.b64encode(Search.encode('UTF-8')) 38 select = str(select, 'UTF-8') 39 fofa_url = "https://fofa.so/api/v1/search/all?email=" + useremail + "&key=" + userkey + "&qbase64=" + select 40 try: 41 res = requests.get(fofa_url) 42 result = json.loads(res.text) 43 IptoHostname(result) 44 count = 0 45 with open('ip.txt', 'w') as targets: 46 for i in result['results']: 47 targets.write(i[1] + ':' + i[2] + '\n') 48 print(i[1]) 49 count += 1 50 print("搜索結果有" + str(count) + "條,已保存") 51 52 53 except Exception as e: 54 print(e) 55 56 if __name__ == '__main__': 57 getTarget() 58 59 60 61 62 with open('ip.txt', 'r') as f: 63 urls_data = [data.strip().strip('\\') for data in f] 64 65 # 添加http://頭 66 def add_http(): 67 for itme in urls_data: 68 a = 'http://' + itme 69 # print(a) 70 71 with open('url.txt','a') as url: 72 url.write(a + '\n') 73 pass 74 75 # 在url后添加端口 76 # def add_port(): 77 # # 常用端口修改 78 # port_dict={1080,1883,3306,443,50070,5222,7001,80,8080,8081,81,8443,8686,9000,9092,9200,9999,53,9100} 79 # for itme in port_dict: 80 # for i in urls_data: 81 # with open('test.txt','a') as url: 82 # url.write(i +':'+str(itme) + '\n') 83 # print(i +':'+str(itme)) 84 85 # 根據需求功能進行添加 86 if __name__ == '__main__': 87 add_http() 88 # add_port() 89 90 # -*- coding:utf-8 -*- 91 import re 92 import io 93 import requests 94 import urllib3 95 import logging 96 from concurrent.futures import ThreadPoolExecutor 97 import time 98 import threading 99 from requests.packages.urllib3.exceptions import InsecureRequestWarning 100 101 logging.captureWarnings(True) 102 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 103 104 start = time.time() 105 lock = threading.Lock() 106 107 f = open("result.csv", "a") 108 f.write("源地址" + "," + "跳轉地址" + "," + "狀態碼" + "," + "標題" + '\n') 109 f = f.close() 110 111 # 獲取url列表 112 with open('url.txt', 'r') as f: 113 urls_data = [data.strip().strip('\\') for data in f] 114 115 116 # 獲取狀態碼、標題 117 def get_title(url, timeout=5): 118 header = { 119 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 120 } 121 try: 122 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 123 code = res.status_code 124 except Exception as error: 125 code = "無法訪問" 126 127 with lock: 128 code1 = str(code) 129 if code1 != "無法訪問": 130 try: 131 urllib3.disable_warnings() 132 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 133 res.encoding = res.apparent_encoding 134 title = re.findall("(?<=\<title\>)(?:.|\n)+?(?=\<)", res.text, re.IGNORECASE)[0].strip() 135 except: 136 title = "[ ]" 137 print(url + "," + res.url + "," + code1 + "," + title) 138 with io.open("result.csv", "a", encoding='GBK') as f2: 139 f2.writelines(url + "," + res.url + "," + code1 + "," + title + '\n') 140 141 else: 142 title = " " 143 print(url + "," + " " + "," + code1 + "," + title) 144 with open("result.csv", "a") as f2: 145 f2.writelines(url + "," + " " + "," + code1 + "," + title + '\n') 146 147 148 # 多線程 149 with ThreadPoolExecutor(max_workers=200) as executor: 150 for urls in urls_data: 151 executor.submit( 152 get_title, url=urls 153 ) 154 155 end = time.time() 156 print("總耗時:", end - start, "秒")
五、總結
寫工具的初衷就是開源的太少了,在現如今工作中太少人進行共享了,我已經把自己寫的這套工具分享在開源平台上。在設計過程中,學習到了新技術,例如如使用接口進行json的處理。同時也收獲了新的編程思維,需要改進的地方就是經驗不足了,如滿足某些功能時需要去查閱大量的方法進行小部分調試然后再能匯總到代碼整體。
©歸Xiao0yan所有。