一、选题背景
Qa:
我为什么要选择此选题呢?
目前开源代码很少这类型的工具,为了方便自己在信息收集方面的便利。所以才选择此题作为期末作业,同时也提升自己的水平、编程思维。
达到什么预期目标?
便于自己信息信息收集,以及对网站的状态检测。
项目背景
fofa网络空间引擎介绍:
FOFA是白帽汇推出的一款网络空间资产搜索引擎。它能够帮助用户迅速进行网络资产匹配、加快后续工作进程。例如进行漏洞影响范围分析、应用分布统计、应用流行度排名统计等。
简单理解就是,一个本土加强版shodan,知道某产品在互联网的部署情况、获取一个根域名所有子域名网站、根据IP确认企业、根据一个子域名网站找到跟他在一个IP的其他网站、全网漏洞扫描、一个新的漏洞全网的。
二、主题式网络爬虫设计方案
- 名称:基于爬虫进行url采集检测
- 内容:爬取fofa网络空间引擎搜索的内容,然后进行网站状态的检测。
- 设计方案描述:
描述:本工具分为三部分进行设计,url采集、url格式修改、网站状态检测。
思路:首先使用json,接口进行数据处理、requests进行网页爬取,其次使用open()函数进行url 的处理,最后使用。requests.status进行状态的检测。
技术难点:文件处理、url爬取筛选内容、url格式处理、线程。
三、主页体面的结构特征分析
首页是一个正常的搜索引擎界面:
从结构特征:
结构分析:
搜索框:
网页左侧显示部分也是div标签:
右侧整体显示部分div标签:
所爬取的内容:
定位目标信息:
节点标签查找:
soup.find(name="input", attrs={"id": "total_entries"})
soup.find_all(name="div", attrs={"class": "list_mod"})
PS:查找结点这次我使用的是接口然后处理json。
四、网络爬虫程序分析
结构:
url采集:
配置文件文件名:fofaConfig.yaml
1 email: #用户名邮箱 2 key: #用户名的key 3 4 Search: 'app="泛微-协同办公OA" && country="CN"&&is_domain="true"' #fofa语法搜索检测的内容
文件名:fofa.py
1 # encoding: utf-8 2 import json 3 import requests 4 from requests.packages.urllib3.exceptions import InsecureRequestWarning 5 import base64 6 import yaml 7 8 # 忽略https安全请求警告 9 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 10 # 结果 11 result = [] 12 13 14 # 获取配置信息 15 def getFofaconfig(): 16 file = open("fofaConfig.yaml", 'r', encoding='utf-8') 17 file_date = file.read() # 配置文件数据保存到变量中 18 file.close() 19 # 处理配置文件 20 Userdata = yaml.load(file_date, Loader=yaml.FullLoader) 21 return Userdata 22 23 24 def IptoHostname(result): 25 url =[] 26 j = 1 27 for i in result['results']: 28 url.append(str(i[1]+':'+i[2])) 29 print(url) 30 31 32 33 def getTarget(): 34 useremail = str(getFofaconfig().get('email')) 35 userkey = str(getFofaconfig().get('key')) 36 # fofa获取站点 37 Search = getFofaconfig().get('Search') 38 select = base64.b64encode(Search.encode('UTF-8')) 39 select = str(select, 'UTF-8') 40 fofa_url = "https://fofa.so/api/v1/search/all?email=" + useremail + "&key=" + userkey + "&qbase64=" + select 41 #写入数据 42 try: 43 res = requests.get(fofa_url) 44 result = json.loads(res.text) 45 IptoHostname(result) 46 count = 0 47 with open('ip.txt', 'w') as targets: 48 for i in result['results']: 49 targets.write(i[1] + ':' + i[2] + '\n') 50 print(i[1]) 51 count += 1 52 print("搜索结果有" + str(count) + "条,已保存") 53 54 55 except Exception as e: 56 print(e) 57 58 if __name__ == '__main__': 59 getTarget()
运行截图:
结果:
url格式处理:
文件名:alter_url.py
1 with open('ip.txt', 'r') as f: 2 urls_data = [data.strip().strip('\\') for data in f] 3 4 # 添加http://头 5 def add_http(): 6 for itme in urls_data: 7 a = 'http://' + itme 8 # print(a) 9 10 with open('url.txt','a') as url: 11 url.write(a + '\n') 12 pass 13 14 # 在url后添加端口 15 # def add_port(): 16 # # 常用端口修改 17 # port_dict={1080,1883,3306,443,50070,5222,7001,80,8080,8081,81,8443,8686,9000,9092,9200,9999,53,9100} 18 # for itme in port_dict: 19 # for i in urls_data: 20 # with open('test.txt','a') as url: 21 # url.write(i +':'+str(itme) + '\n') 22 # print(i +':'+str(itme)) 23 24 # 根据需求功能进行添加 25 if __name__ == '__main__': 26 add_http() 27 # add_port()
运行截图:
结果:
url状态检测:
文件名:url_status.py
1 # -*- coding:utf-8 -*- 2 import re 3 import io 4 import requests 5 import urllib3 6 import logging 7 from concurrent.futures import ThreadPoolExecutor 8 import time 9 import threading 10 from requests.packages.urllib3.exceptions import InsecureRequestWarning 11 12 logging.captureWarnings(True) 13 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 14 15 start = time.time() 16 lock = threading.Lock() 17 18 f = open("result.csv", "a") 19 f.write("源地址" + "," + "跳转地址" + "," + "状态码" + "," + "标题" + '\n') 20 f = f.close() 21 22 # 获取url列表 23 with open('url.txt', 'r') as f: 24 urls_data = [data.strip().strip('\\') for data in f] 25 26 print(urls_data) 27 # 获取状态码、标题 28 def get_title(url, timeout=5): 29 header = { 30 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 31 } 32 try: 33 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 34 code = res.status_code 35 except Exception as error: 36 code = "无法访问" 37 38 with lock: 39 code1 = str(code) 40 if code1 != "无法访问": 41 try: 42 urllib3.disable_warnings() 43 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 44 res.encoding = res.apparent_encoding 45 title = re.findall("(?<=\<title\>)(?:.|\n)+?(?=\<)", res.text, re.IGNORECASE)[0].strip() 46 except: 47 title = "[ ]" 48 print(url + "," + res.url + "," + code1 + "," + title) 49 with io.open("result.csv", "a", encoding='utf-8') as f2: 50 f2.writelines(url + "," + res.url + "," + code1 + "," + title + '\n') 51 52 else: 53 title = " " 54 print(url + "," + " " + "," + code1 + "," + title) 55 with open("result.csv", "a") as f2: 56 f2.writelines(url + "," + " " + "," + code1 + "," + title + '\n') 57 58 59 # 多线程 60 with ThreadPoolExecutor(max_workers=50) as executor: 61 for urls in urls_data: 62 executor.submit( 63 get_title, url=urls 64 ) 65 66 end = time.time() 67 print("总耗时:", end - start, "秒")
运行截图:
结果:
总代码:
1 import json 2 import requests 3 from requests.packages.urllib3.exceptions import InsecureRequestWarning 4 import base64 5 import yaml 6 # encoding: utf-8 7 8 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 9 10 result = [] 11 12 13 14 def getFofaconfig(): 15 file = open("fofaConfig.yaml", 'r', encoding='utf-8') 16 file_date = file.read() 17 file.close() 18 19 Userdata = yaml.load(file_date, Loader=yaml.FullLoader) 20 return Userdata 21 22 23 def IptoHostname(result): 24 url =[] 25 j = 1 26 for i in result['results']: 27 url.append(str(i[1]+':'+i[2])) 28 print(url) 29 30 31 32 def getTarget(): 33 useremail = str(getFofaconfig().get('email')) 34 userkey = str(getFofaconfig().get('key')) 35 36 Search = getFofaconfig().get('Search') 37 select = base64.b64encode(Search.encode('UTF-8')) 38 select = str(select, 'UTF-8') 39 fofa_url = "https://fofa.so/api/v1/search/all?email=" + useremail + "&key=" + userkey + "&qbase64=" + select 40 try: 41 res = requests.get(fofa_url) 42 result = json.loads(res.text) 43 IptoHostname(result) 44 count = 0 45 with open('ip.txt', 'w') as targets: 46 for i in result['results']: 47 targets.write(i[1] + ':' + i[2] + '\n') 48 print(i[1]) 49 count += 1 50 print("搜索结果有" + str(count) + "条,已保存") 51 52 53 except Exception as e: 54 print(e) 55 56 if __name__ == '__main__': 57 getTarget() 58 59 60 61 62 with open('ip.txt', 'r') as f: 63 urls_data = [data.strip().strip('\\') for data in f] 64 65 # 添加http://头 66 def add_http(): 67 for itme in urls_data: 68 a = 'http://' + itme 69 # print(a) 70 71 with open('url.txt','a') as url: 72 url.write(a + '\n') 73 pass 74 75 # 在url后添加端口 76 # def add_port(): 77 # # 常用端口修改 78 # port_dict={1080,1883,3306,443,50070,5222,7001,80,8080,8081,81,8443,8686,9000,9092,9200,9999,53,9100} 79 # for itme in port_dict: 80 # for i in urls_data: 81 # with open('test.txt','a') as url: 82 # url.write(i +':'+str(itme) + '\n') 83 # print(i +':'+str(itme)) 84 85 # 根据需求功能进行添加 86 if __name__ == '__main__': 87 add_http() 88 # add_port() 89 90 # -*- coding:utf-8 -*- 91 import re 92 import io 93 import requests 94 import urllib3 95 import logging 96 from concurrent.futures import ThreadPoolExecutor 97 import time 98 import threading 99 from requests.packages.urllib3.exceptions import InsecureRequestWarning 100 101 logging.captureWarnings(True) 102 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 103 104 start = time.time() 105 lock = threading.Lock() 106 107 f = open("result.csv", "a") 108 f.write("源地址" + "," + "跳转地址" + "," + "状态码" + "," + "标题" + '\n') 109 f = f.close() 110 111 # 获取url列表 112 with open('url.txt', 'r') as f: 113 urls_data = [data.strip().strip('\\') for data in f] 114 115 116 # 获取状态码、标题 117 def get_title(url, timeout=5): 118 header = { 119 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 120 } 121 try: 122 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 123 code = res.status_code 124 except Exception as error: 125 code = "无法访问" 126 127 with lock: 128 code1 = str(code) 129 if code1 != "无法访问": 130 try: 131 urllib3.disable_warnings() 132 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout) 133 res.encoding = res.apparent_encoding 134 title = re.findall("(?<=\<title\>)(?:.|\n)+?(?=\<)", res.text, re.IGNORECASE)[0].strip() 135 except: 136 title = "[ ]" 137 print(url + "," + res.url + "," + code1 + "," + title) 138 with io.open("result.csv", "a", encoding='GBK') as f2: 139 f2.writelines(url + "," + res.url + "," + code1 + "," + title + '\n') 140 141 else: 142 title = " " 143 print(url + "," + " " + "," + code1 + "," + title) 144 with open("result.csv", "a") as f2: 145 f2.writelines(url + "," + " " + "," + code1 + "," + title + '\n') 146 147 148 # 多线程 149 with ThreadPoolExecutor(max_workers=200) as executor: 150 for urls in urls_data: 151 executor.submit( 152 get_title, url=urls 153 ) 154 155 end = time.time() 156 print("总耗时:", end - start, "秒")
五、总结
写工具的初衷就是开源的太少了,在现如今工作中太少人进行共享了,我已经把自己写的这套工具分享在开源平台上。在设计过程中,学习到了新技术,例如如使用接口进行json的处理。同时也收获了新的编程思维,需要改进的地方就是经验不足了,如满足某些功能时需要去查阅大量的方法进行小部分调试然后再能汇总到代码整体。
©归Xiao0yan所有。