探測網絡設備ACL規則
背景:在互聯網企業的生產網絡中,往往在網絡入口處的網絡設備上會有成千上萬條ACL策略,這么多的ACL導致了網絡管理員很難徹底梳理清楚其中的邏輯關系,從而不知道到底對外開放了哪些IP和哪些端口。
解決手段:編寫ACL規則探測程序,從公網掃描該網絡設備的ACL規則
工作原理:不管是交換機還是路由器或防火牆,在處理數據包時ACL規則總是優先於ICMP規則。即:當網絡設備收到一個TTL為0的報文時會先匹配ACL規則之后再向發送者發送 ICMP time exceeded消息,基於此原理就可以在公網發送以IDC內地址為目的IP且TTL到被探測設備時剛好減為0的數據包,如果被探測設備返回了ICMP time exceeded消息則說明它的ACL策略針對此IP及port開放,如果沒有返回包則說明數據包被它的ACL阻攔
圖示:
程序實現語言:python3
源碼:
1 # coding:utf-8
2
3 from itertools import groupby 4 from scapy.all import *
5 import re 6 import sys 7 import IPy 8
9
10 class RangeException(Exception): 11 pass
12
13
14 class InputType(Exception): 15 pass
16
17
18 class TargetNotSupport(Exception): 19 pass
20
21
22 class OptionError(Exception): 23 pass
24
25
26 class PortScan(object): 27 def __init__(self, speed=3): 28 self.open_port = [] 29 self.speed = speed 30
31 def __str__(self): 32 speed_statement = '使用PortScan(*)創建對象時可以在*處指定掃描速率,默認為3,數值越小掃描速度越快\n' \ 33 '注意:隨着掃描速度的增加准確率會相應降低!'
34 return speed_statement 35
36 # 從本地文件讀取IP資源
37 def __target(self): 38 try: 39 open_file = input('請輸入要導入資源的文件名字:') 40 address_file = open(open_file, 'r') 41 address_list = [] 42 for i in address_file.readlines(): 43 i = i.replace('\n', '') 44 address_list.append(i) 45 except FileNotFoundError: 46 print('\n') 47 print('請先在本地創建對應名字的IP列表文本文件!!!') 48 print('\n') 49 self.scan() 50
51 except KeyboardInterrupt: 52 print('') 53 sys.exit() 54
55 except Exception as error: 56 print('打開本地文件有誤!!!') 57 print(error) 58 self.scan() 59 else: 60 return address_list 61
62 # 獲取IP資源
63 # 輸入1從一個文件讀取IP,輸入2從屏幕輸入獲取IP
64 # 獲取的IP信息可以是單個IP地址(例:220.12.12.12),也可以是一個地址段(例:192.168.1.0/24)
65 # 最終返回一個IP地址列表,此列表包含了輸入的所有單個IP地址以及地址段中的可用IP
66 def get_ip(self, option, string): 67
68 address_store = [] # IP資源存儲
69
70 # 如果選1則從文件讀取IP資源
71 if option == 1: 72 # 得到打開IP表文件名字及其IP表
73 address_list = self.__target() 74
75 # 如果選2則手動輸入IP資源
76 if option == 2: 77 # 接收IP數據
78 address_list = input(string) # 1.1.1.1,2.2.2.0/24
79 address_list = address_list.split(',') 80
81 # 1.1.1.1/24 的正則
82 ip_range_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \ 83 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \ 84 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \ 85 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))/' \ 86 r'(3[012]|[12][0-9]|[1-9]) *'
87 # 1.1.1.1,2.2.2.2,3.3.3.3 的正則
88 ip_address_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \ 89 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \ 90 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \ 91 r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))'
92
93 # 對輸入的值進行便利,提取其中的IP地址
94 for i in address_list: 95 range_re_result = re.match(ip_range_re, i) # 1.1.1.1/24的正則匹配結果
96 ip_re_result = re.match(ip_address_re, i) # 1.1.1.1,2.2.2.2,3.3.3.3 的正則匹配結果
97 if range_re_result: 98 subnet_mask = range_re_result.group(6) 99 network_number = re.sub(r'^0+', '', range_re_result.group(1)) 100 address_string = network_number + '/' + subnet_mask # 如果輸入1.1.1.1/24類型則address_store為字符串
101 try: 102 # 提取網段內所有可用IP地址並加表
103 address_subset = IPy.IP(address_string) 104 for i in address_subset: 105 if i == address_subset[len(address_subset)-1] or i == address_subset[0]: 106 continue
107 else: 108 address_store.append(str(i)) 109 except ValueError: 110 print('輸入有誤,請按"網絡號/掩碼"或"IP地址"格式輸入') 111 self.scan() 112
113 elif ip_re_result: 114 ii = re.sub(r'^0+', '', ip_re_result.group()) 115 address_store.append(ii) #對單個IP地址形式的輸入直接加表
116 else: 117 print('輸入有誤,請輸入正確的IP地址(e.g:1.1.1.1,192.168.1.0/24)!!!') 118 self.get_ip(option) 119 return address_store 120
121 # 通過從屏幕輸入獲取端口資源
122 # 輸入形式可以為單個端口號(例:3389),也可以是一個端口范圍(例:22-25)
123 # 返回數據為一個列表,其中每個元素都以元組形式存在. 每個元組包含兩個整數元素,第一個為端口范圍的最小值,第二個為端口范圍的最大值
124 # 注意: 單個端口號形式的輸入最后也將以范圍形式輸出,其最大值與最小值都為他本身
125 # 返回數據舉例: [(22-25),(3389,3389)]
126 def get_port(self): 127 port_range = input('請輸入要掃描端口范圍(e.g: 3389,20-25):') 128 target_port = [] # 端口資源存儲
129 try: 130 port_range = port_range.split(',') # 例:['1', '2', '3-10', '11-20']
131 for i in port_range: 132 if re.match(' *(\d+)-(\d+).*', i): 133 # for ii in range(len(open_port_list)):
134 low_port = int(re.match(' *(\d+)-(\d+).*', i).group(1)) 135 high_port = int(re.match(' *(\d+)-(\d+).*', i).group(2)) 136 if low_port >= high_port or low_port <= 0 or low_port > 65535 or high_port <= 0 or high_port > 65535: 137 raise RangeException 138 else: 139 target_port.append((low_port, high_port)) # 如果是范圍則把最小值和最大值以元組形式加表
140 elif re.match(' *\d+ *', i): 141 singular = int(re.match(' *(\d+) *', i).group(1)) 142 if 0 < singular <= 65535: 143 target_port.append((singular, singular)) # 如果是單整數則把它當作范圍一樣處理,最大值和最小值均為它自己
144 else: 145 raise RangeException 146 else: 147 raise InputType 148 except RangeException: 149 print('端口應為1-65535之間的整數,且輸入范圍格式應當為從小到大') 150 self.get_port() 151 except InputType: 152 print('端口類型應為整數') 153 self.get_port() 154 except KeyboardInterrupt: 155 print('') 156 sys.exit() 157 except Exception as unusual: 158 print('輸入有誤!') 159 print(unusual) 160 self.get_port() 161 return target_port # 返回經過處理的目標端口列表
162
163 # 對純數字的列表進行排序且范圍切塊
164 # 例:導入[11,22,33,1,2,3,4,5]----->導出[1-5,11,22,33]
165 @staticmethod 166 def int_single_to_range(original): 167 original.sort() # 先排序
168 open_port_range = [] 169 fun = lambda x: x[1] - x[0] 170 for k, g in groupby(enumerate(original), fun): 171 l1 = [j for i, j in g] # 連續數字的列表
172 if len(l1) > 1: 173 scop = str(min(l1)) + '-' + str(max(l1)) # 將連續數字范圍用"-"連接
174 else: 175 scop = l1[0] 176 open_port_range.append("{}".format(scop)) 177 return open_port_range 178
179 # TTL自動檢測
180 # 導入一個被探測設備IP列表,返回一個被探測設備IP與相應TTL的字典,例:{'220.2.2.2':15}
181 def ttl_check(self, address_list): 182 print('准備中...') 183 probe_device_ttl = {} 184 # switch = 0 # 檢測返回數據包的源IP是否為被探測設備
185 try: 186 for i in address_list: 187 for ii in range(1, 129): 188 print(i, ii) 189 scan_packet = IP(dst=i, ttl=ii) / TCP(dport=8080, flags='S') 190 ttl_source = sr1(scan_packet, timeout=3, verbose=False) 191 #while 1:
192 # time.sleep(0.001)
193 if ttl_source: 194 try: 195 if ttl_source['IP'].fields['src'] == i: 196 probe_device_ttl[i] = ii 197 # switch = 1
198 break
199 else: 200 continue
201 except Exception as receive_error: 202 print(receive_error) 203 raise
204 # if switch == 1:
205 # break
206 else: 207 print('TTL超時!!!') 208
209 except KeyboardInterrupt: 210 print('') 211 sys.exit() 212
213 except Exception as error: 214 print('程序出現錯誤!!!') 215 print(error) 216 self.scan() 217 else: 218 print('准備完畢') 219 return probe_device_ttl 220
221 @staticmethod 222 def option(): 223 print('請選擇導入被掃描信息方式:\n'
224 '1 從文件導入\n'
225 '2 在程序中手動輸入\n') 226
227 def scan(self): 228 # 功能選擇
229 self.option() 230 try: 231 option = int(input('我選擇: ')) 232 print(option) 233 if option != 2 and option != 1: 234 raise OptionError 235 except OptionError: 236 print('請輸入功能標號!') 237 self.scan() 238
239 # 獲取要掃描IP列表
240 address_store = self.get_ip(option, '請輸入被探測IP資源:') 241
242 # 獲取要掃描的端口列表
243 port_range = self.get_port() 244
245 probe_device = self.get_ip(2, '請輸入被探測的安全設備IP地址:') 246
247 # 自動檢測到探測設備的TTL值,該值為一個字典,key為被探測安全設備IP,value為到該設備的TTL值
248 ttl = self.ttl_check(probe_device) 249
250 count = 0 # 用作進度百分比的分子. 以每個IP的每個端口為單位進行計數,總數為IP個數*端口個數
251
252 if ttl: 253 # 挨個兒朝被探測設備發送端口探測包
254 for probe_device_ip, ttl in ttl.items(): 255
256 print(probe_device_ip + '端口開放情況:') 257
258 # 創建一個新文件,准備導入結果
259 write_file = open(probe_device_ip + '-result.txt', 'w') 260
261 try: 262 # 為每個被探測設備計算IP資源池中所有的IP資源
263 for i in address_store: 264
265 # 為每個IP計算各個輸入IP端口范圍開放情況
266 for port in port_range: 267 (low_port, high_port) = port 268 scan_packets = IP(dst=i, ttl=ttl) / TCP(dport=(low_port, high_port), flags='S') # 構造檢測包
269 replay_packets_total = sr(scan_packets, timeout=self.speed, verbose=False) # 發送檢測包及接收返回包
270 open_port_list = replay_packets_total[0].res # 開放端口原始對象列表(一個IP不同端口范圍回包的集合)
271
272 # 一個IP有幾個端口開放就有幾個回包(如果端口被ACL干掉則不會回包),以下遍歷回包來讀取開放的端口
273 for ii in range(len(open_port_list)): 274 try: 275 if open_port_list[ii][1]['ICMP'].fields['type'] == 11: # ICMP類型為11時為TTL超時包
276 self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport']) # TTL超時則為開放端口,將開放端口進行加表
277 continue
278 else: 279 if open_port_list[ii][1]['ICMP'].fields['type'] == 3: # 不知為啥有時候會返回類型為3的ICMP包(即:端口不可達包)
280 continue
281 else: 282 # 除11和3外其他類型的ICMP回包,需進行人工排查
283 print('ICMP返回類型不對') 284 print(open_port_list[ii][1]['ICMP'].fields) 285 print(open_port_list[ii][0]['TCP'].fields) 286 except IndexError: 287
288 # 如果探測設備IP剛好為要掃描的IP時,開放端口會返回SYN,ACK包
289 if open_port_list[ii][1]['TCP'].fields['flags'] == 'SA': 290 self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport']) 291 continue
292
293 # 不知為啥有時候交換機會返回RST ACK的包
294 if open_port_list[ii][1]['TCP'].fields['flags'] == 'RA': 295 continue
296 else: 297 print('返回未知TCP包,需人工分析') 298 print(open_port_list[ii][1]['TCP'].fields, 299 open_port_list[ii][1]['TCP'].fields['flags']) 300 print(open_port_list[ii]) 301
302 count += 1 # 執行進度+1(每計算完一個IP進度+1)
303 print(count) 304
305 # 進度統計
306 speed_to_progress = count / len(address_list) * len(port_range) * len(ttl) * 100
307 print('\r已完成:%.2f%% ' % speed_to_progress, end='') 308
309 self.open_port = self.int_single_to_range(self.open_port) # 對開放端口列表進行排序和范圍化
310 print('針對' + i + '開放端口: ', self.open_port) 311 write_file.write(str(i) + ':' + str(self.open_port) + '\n') # 每掃描完一個IP就把該IP結果寫入文件
312 self.open_port = [] # 掃尾工作,為下個IP掃描准備一個干凈的開放端口列表
313
314 write_file.close() 315
316 except KeyboardInterrupt: 317 print('') 318 write_file.close() 319 sys.exit() 320 except Exception as error: 321 write_file.close() 322 print('程序異常退出!') 323 print(error) 324 else: 325 write_file.close() 326 print('') 327 if option == 1: 328 print('被探測設備%s已完成,結果已導入當前路徑''\'%s\'''文件中' % (probe_device_ip, probe_device_ip + '-result.txt')) 329 if option == 2: 330 print('掃描已完成!') 331
332
333 if __name__ == '__main__': 334
335 def banner(): 336 print('\n') 337 print('============================================') 338 print('\n') 339 print('\n') 340 print(' ACL有效性探測系統v1.0 ') 341 print('\n') 342 print('\n') 343 print('============================================') 344 print('\n') 345
346 def main(): 347 banner() 348 a = PortScan() 349 a.scan() 350
351 main()