基於爬蟲進行url采集檢測


 

一、選題背景

  Qa:

  我為什么要選擇此選題呢?

    目前開源代碼很少這類型的工具,為了方便自己在信息收集方面的便利。所以才選擇此題作為期末作業,同時也提升自己的水平、編程思維。

  達到什么預期目標?

   便於自己信息信息收集,以及對網站的狀態檢測。

  項目背景

    fofa網絡空間引擎介紹:

       FOFA是白帽匯推出的一款網絡空間資產搜索引擎。它能夠幫助用戶迅速進行網絡資產匹配、加快后續工作進程。例如進行漏洞影響范圍分析、應用分布統計、應用流行度排名統計等。
     簡單理解就是,一個本土加強版shodan,知道某產品在互聯網的部署情況、獲取一個根域名所有子域名網站、根據IP確認企業、根據一個子域名網站找到跟他在一個IP的其他網站、全網漏洞掃描、一個新的漏洞全網的。

 

二、主題式網絡爬蟲設計方案

  1.  名稱:基於爬蟲進行url采集檢測
  2.  內容:爬取fofa網絡空間引擎搜索的內容,然后進行網站狀態的檢測。
  3.  設計方案描述:

描述:本工具分為三部分進行設計,url采集、url格式修改、網站狀態檢測。

思路:首先使用json,接口進行數據處理、requests進行網頁爬取,其次使用open()函數進行url 的處理,最后使用。requests.status進行狀態的檢測。

技術難點:文件處理、url爬取篩選內容、url格式處理、線程。

 

三、主頁體面的結構特征分析

  首頁是一個正常的搜索引擎界面:

   

 

 

 

從結構特征:

 

 

 

 

  是動態網站,css+js。

結構分析:

  搜索框:

   

  網頁左側顯示部分也是div標簽:

   

  右側整體顯示部分div標簽:

   

  所爬取的內容:

   

  定位目標信息:

   

  節點標簽查找:

   soup.find(name="input", attrs={"id": "total_entries"})
   soup.find_all(name="div", attrs={"class": "list_mod"})

    PS:查找結點這次我使用的是接口然后處理json。

四、網絡爬蟲程序分析

  結構:

   

url采集:

配置文件文件名:fofaConfig.yaml

1 email:     #用戶名郵箱
2 key:   #用戶名的key
3 
4 Search: 'app="泛微-協同辦公OA" && country="CN"&&is_domain="true"'    #fofa語法搜索檢測的內容 

 

文件名:fofa.py

 1 # encoding: utf-8
 2 import json
 3 import requests
 4 from requests.packages.urllib3.exceptions import InsecureRequestWarning
 5 import base64
 6 import yaml
 7 
 8 # 忽略https安全請求警告
 9 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
10 # 結果
11 result = []
12 
13 
14 # 獲取配置信息
15 def getFofaconfig():
16     file = open("fofaConfig.yaml", 'r', encoding='utf-8')
17     file_date = file.read()  # 配置文件數據保存到變量中
18     file.close()
19     # 處理配置文件
20     Userdata = yaml.load(file_date, Loader=yaml.FullLoader)
21     return Userdata
22 
23 
24 def IptoHostname(result):
25     url =[]
26     j = 1
27     for i in result['results']:
28         url.append(str(i[1]+':'+i[2]))
29     print(url)
30 
31 
32 
33 def getTarget():
34     useremail = str(getFofaconfig().get('email'))
35     userkey = str(getFofaconfig().get('key'))
36     # fofa獲取站點
37     Search = getFofaconfig().get('Search')
38     select = base64.b64encode(Search.encode('UTF-8'))
39     select = str(select, 'UTF-8')
40     fofa_url = "https://fofa.so/api/v1/search/all?email=" + useremail + "&key=" + userkey + "&qbase64=" + select
41     #寫入數據
42     try:
43         res = requests.get(fofa_url)
44         result = json.loads(res.text)
45         IptoHostname(result)
46         count = 0
47         with open('ip.txt', 'w') as targets:
48             for i in result['results']:
49                 targets.write(i[1] + ':' + i[2] + '\n')
50                 print(i[1])
51                 count += 1
52             print("搜索結果有" + str(count) + "條,已保存")
53 
54 
55     except Exception as e:
56         print(e)
57 
58 if __name__ == '__main__':
59     getTarget() 

 

運行截圖:

   

結果:

  

 

   

 

 

 

url格式處理:

文件名:alter_url.py

 

 1 with open('ip.txt', 'r') as f:
 2     urls_data = [data.strip().strip('\\') for data in f]
 3 
 4 # 添加http://頭
 5 def add_http():
 6     for itme in  urls_data:
 7         a = 'http://' + itme
 8         # print(a)
 9 
10         with open('url.txt','a') as url:
11             url.write(a + '\n')
12             pass
13 
14 # 在url后添加端口
15 # def add_port():
16 #     # 常用端口修改
17 #     port_dict={1080,1883,3306,443,50070,5222,7001,80,8080,8081,81,8443,8686,9000,9092,9200,9999,53,9100}
18 #     for itme in port_dict:
19 #         for i in urls_data:
20 #             with open('test.txt','a') as url:
21 #                 url.write(i +':'+str(itme) + '\n')
22 #                 print(i +':'+str(itme))
23 
24 # 根據需求功能進行添加
25 if __name__ == '__main__':
26      add_http()
27     # add_port() 

 

運行截圖:

   

結果:

   

 

 

    

url狀態檢測:

文件名:url_status.py

 

 1 # -*- coding:utf-8 -*-
 2 import re
 3 import io
 4 import requests
 5 import urllib3
 6 import logging
 7 from concurrent.futures import ThreadPoolExecutor
 8 import time
 9 import threading
10 from requests.packages.urllib3.exceptions import InsecureRequestWarning
11 
12 logging.captureWarnings(True)
13 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
14 
15 start = time.time()
16 lock = threading.Lock()
17 
18 f = open("result.csv", "a")
19 f.write("源地址" + "," + "跳轉地址" + "," + "狀態碼" + "," + "標題" + '\n')
20 f = f.close()
21 
22 # 獲取url列表
23 with open('url.txt', 'r') as f:
24     urls_data = [data.strip().strip('\\') for data in f]
25 
26 print(urls_data)
27 # 獲取狀態碼、標題
28 def get_title(url, timeout=5):
29     header = {
30         'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
31     }
32     try:
33         res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout)
34         code = res.status_code
35     except Exception as error:
36         code = "無法訪問"
37 
38     with lock:
39         code1 = str(code)
40         if code1 != "無法訪問":
41             try:
42                 urllib3.disable_warnings()
43                 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout)
44                 res.encoding = res.apparent_encoding
45                 title = re.findall("(?<=\<title\>)(?:.|\n)+?(?=\<)", res.text, re.IGNORECASE)[0].strip()
46             except:
47                 title = "[ ]"
48             print(url + "," + res.url + "," + code1 + "," + title)
49             with io.open("result.csv", "a", encoding='utf-8') as f2:
50                 f2.writelines(url + "," + res.url + "," + code1 + "," + title + '\n')
51 
52         else:
53             title = " "
54             print(url + "," + " " + "," + code1 + "," + title)
55             with open("result.csv", "a") as f2:
56                 f2.writelines(url + "," + " " + "," + code1 + "," + title + '\n')
57 
58 
59 # 多線程
60 with ThreadPoolExecutor(max_workers=50) as executor:
61     for urls in urls_data:
62         executor.submit(
63             get_title, url=urls
64         )
65 
66 end = time.time()
67 print("總耗時:", end - start, "") 

 

運行截圖:

    

結果:

    

 

總代碼: 

 

  1 import json
  2 import requests
  3 from requests.packages.urllib3.exceptions import InsecureRequestWarning
  4 import base64
  5 import yaml
  6 # encoding: utf-8
  7 
  8 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
  9 
 10 result = []
 11 
 12 
 13 
 14 def getFofaconfig():
 15     file = open("fofaConfig.yaml", 'r', encoding='utf-8')
 16     file_date = file.read()  
 17     file.close()
 18 
 19     Userdata = yaml.load(file_date, Loader=yaml.FullLoader)
 20     return Userdata
 21 
 22 
 23 def IptoHostname(result):
 24     url =[]
 25     j = 1
 26     for i in result['results']:
 27         url.append(str(i[1]+':'+i[2]))
 28     print(url)
 29 
 30 
 31 
 32 def getTarget():
 33     useremail = str(getFofaconfig().get('email'))
 34     userkey = str(getFofaconfig().get('key'))
 35 
 36     Search = getFofaconfig().get('Search')
 37     select = base64.b64encode(Search.encode('UTF-8'))
 38     select = str(select, 'UTF-8')
 39     fofa_url = "https://fofa.so/api/v1/search/all?email=" + useremail + "&key=" + userkey + "&qbase64=" + select
 40     try:
 41         res = requests.get(fofa_url)
 42         result = json.loads(res.text)
 43         IptoHostname(result)
 44         count = 0
 45         with open('ip.txt', 'w') as targets:
 46             for i in result['results']:
 47                 targets.write(i[1] + ':' + i[2] + '\n')
 48                 print(i[1])
 49                 count += 1
 50             print("搜索結果有" + str(count) + "條,已保存")
 51 
 52 
 53     except Exception as e:
 54         print(e)
 55 
 56 if __name__ == '__main__':
 57     getTarget()
 58 
 59 
 60 
 61 
 62 with open('ip.txt', 'r') as f:
 63     urls_data = [data.strip().strip('\\') for data in f]
 64 
 65 # 添加http://頭
 66 def add_http():
 67     for itme in  urls_data:
 68         a = 'http://' + itme
 69         # print(a)
 70 
 71         with open('url.txt','a') as url:
 72             url.write(a + '\n')
 73             pass
 74 
 75 # 在url后添加端口
 76 # def add_port():
 77 #     # 常用端口修改
 78 #     port_dict={1080,1883,3306,443,50070,5222,7001,80,8080,8081,81,8443,8686,9000,9092,9200,9999,53,9100}
 79 #     for itme in port_dict:
 80 #         for i in urls_data:
 81 #             with open('test.txt','a') as url:
 82 #                 url.write(i +':'+str(itme) + '\n')
 83 #                 print(i +':'+str(itme))
 84 
 85 # 根據需求功能進行添加
 86 if __name__ == '__main__':
 87      add_http()
 88     # add_port()
 89 
 90 # -*- coding:utf-8 -*-
 91 import re
 92 import io
 93 import requests
 94 import urllib3
 95 import logging
 96 from concurrent.futures import ThreadPoolExecutor
 97 import time
 98 import threading
 99 from requests.packages.urllib3.exceptions import InsecureRequestWarning
100 
101 logging.captureWarnings(True)
102 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
103 
104 start = time.time()
105 lock = threading.Lock()
106 
107 f = open("result.csv", "a")
108 f.write("源地址" + "," + "跳轉地址" + "," + "狀態碼" + "," + "標題" + '\n')
109 f = f.close()
110 
111 # 獲取url列表
112 with open('url.txt', 'r') as f:
113     urls_data = [data.strip().strip('\\') for data in f]
114 
115 
116 # 獲取狀態碼、標題
117 def get_title(url, timeout=5):
118     header = {
119         'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',
120     }
121     try:
122         res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout)
123         code = res.status_code
124     except Exception as error:
125         code = "無法訪問"
126 
127     with lock:
128         code1 = str(code)
129         if code1 != "無法訪問":
130             try:
131                 urllib3.disable_warnings()
132                 res = requests.get(url, headers=header, verify=False, allow_redirects=True, timeout=timeout)
133                 res.encoding = res.apparent_encoding
134                 title = re.findall("(?<=\<title\>)(?:.|\n)+?(?=\<)", res.text, re.IGNORECASE)[0].strip()
135             except:
136                 title = "[ ]"
137             print(url + "," + res.url + "," + code1 + "," + title)
138             with io.open("result.csv", "a", encoding='GBK') as f2:
139                 f2.writelines(url + "," + res.url + "," + code1 + "," + title + '\n')
140 
141         else:
142             title = " "
143             print(url + "," + " " + "," + code1 + "," + title)
144             with open("result.csv", "a") as f2:
145                 f2.writelines(url + "," + " " + "," + code1 + "," + title + '\n')
146 
147 
148 # 多線程
149 with ThreadPoolExecutor(max_workers=200) as executor:
150     for urls in urls_data:
151         executor.submit(
152             get_title, url=urls
153         )
154 
155 end = time.time()
156 print("總耗時:", end - start, "")

 

 

 

五、總結

寫工具的初衷就是開源的太少了,在現如今工作中太少人進行共享了,我已經把自己寫的這套工具分享在開源平台上。在設計過程中,學習到了新技術,例如如使用接口進行json的處理。同時也收獲了新的編程思維,需要改進的地方就是經驗不足了,如滿足某些功能時需要去查閱大量的方法進行小部分調試然后再能匯總到代碼整體。

 

 

                                      ©歸Xiao0yan所有。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM