如何探測網絡設備ACL規則


 

探測網絡設備ACL規則

 

背景:在互聯網企業的生產網絡中,往往在網絡入口處的網絡設備上會有成千上萬條ACL策略,這么多的ACL導致了網絡管理員很難徹底梳理清楚其中的邏輯關系,從而不知道到底對外開放了哪些IP和哪些端口。

 

解決手段:編寫ACL規則探測程序,從公網掃描該網絡設備的ACL規則

 

工作原理:不管是交換機還是路由器或防火牆,在處理數據包時ACL規則總是優先於ICMP規則。即:當網絡設備收到一個TTL為0的報文時會先匹配ACL規則之后再向發送者發送 ICMP time exceeded消息,基於此原理就可以在公網發送以IDC內地址為目的IP且TTL到被探測設備時剛好減為0的數據包,如果被探測設備返回了ICMP time exceeded消息則說明它的ACL策略針對此IP及port開放,如果沒有返回包則說明數據包被它的ACL阻攔

 

圖示:

程序實現語言:python3

 

源碼:

 1 # coding:utf-8
 2 
 3 from itertools import groupby  4 from scapy.all import *
 5 import re  6 import sys  7 import IPy  8 
 9 
 10 class RangeException(Exception):  11     pass
 12 
 13 
 14 class InputType(Exception):  15     pass
 16 
 17 
 18 class TargetNotSupport(Exception):  19     pass
 20 
 21 
 22 class OptionError(Exception):  23     pass
 24 
 25 
 26 class PortScan(object):  27     def __init__(self, speed=3):  28         self.open_port = []  29         self.speed = speed  30 
 31     def __str__(self):  32         speed_statement = '使用PortScan(*)創建對象時可以在*處指定掃描速率,默認為3,數值越小掃描速度越快\n' \  33                           '注意:隨着掃描速度的增加准確率會相應降低!'
 34         return speed_statement  35 
 36     # 從本地文件讀取IP資源
 37     def __target(self):  38         try:  39             open_file = input('請輸入要導入資源的文件名字:')  40             address_file = open(open_file, 'r')  41             address_list = []  42             for i in address_file.readlines():  43                 i = i.replace('\n', '')  44  address_list.append(i)  45         except FileNotFoundError:  46             print('\n')  47             print('請先在本地創建對應名字的IP列表文本文件!!!')  48             print('\n')  49  self.scan()  50 
 51         except KeyboardInterrupt:  52             print('')  53  sys.exit()  54 
 55         except Exception as error:  56             print('打開本地文件有誤!!!')  57             print(error)  58  self.scan()  59         else:  60             return address_list  61 
 62     # 獲取IP資源
 63     # 輸入1從一個文件讀取IP,輸入2從屏幕輸入獲取IP
 64     # 獲取的IP信息可以是單個IP地址(例:220.12.12.12),也可以是一個地址段(例:192.168.1.0/24)
 65     # 最終返回一個IP地址列表,此列表包含了輸入的所有單個IP地址以及地址段中的可用IP
 66     def get_ip(self, option, string):  67 
 68         address_store = []  # IP資源存儲
 69 
 70         # 如果選1則從文件讀取IP資源
 71         if option == 1:  72             # 得到打開IP表文件名字及其IP表
 73             address_list = self.__target()  74 
 75         # 如果選2則手動輸入IP資源
 76         if option == 2:  77             # 接收IP數據
 78             address_list = input(string)   # 1.1.1.1,2.2.2.0/24
 79             address_list = address_list.split(',')  80 
 81         # 1.1.1.1/24 的正則
 82         ip_range_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  83                       r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  84                       r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  85                       r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))/' \  86                       r'(3[012]|[12][0-9]|[1-9]) *'
 87         # 1.1.1.1,2.2.2.2,3.3.3.3 的正則
 88         ip_address_re = r'( *(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  89                         r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  90                         r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.' \  91                         r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d))'
 92 
 93         # 對輸入的值進行便利,提取其中的IP地址
 94         for i in address_list:  95             range_re_result = re.match(ip_range_re, i)    # 1.1.1.1/24的正則匹配結果
 96             ip_re_result = re.match(ip_address_re, i)    # 1.1.1.1,2.2.2.2,3.3.3.3 的正則匹配結果
 97             if range_re_result:  98                 subnet_mask = range_re_result.group(6)  99                 network_number = re.sub(r'^0+', '', range_re_result.group(1)) 100                 address_string = network_number + '/' + subnet_mask   # 如果輸入1.1.1.1/24類型則address_store為字符串
101                 try: 102                     # 提取網段內所有可用IP地址並加表
103                     address_subset = IPy.IP(address_string) 104                     for i in address_subset: 105                         if i == address_subset[len(address_subset)-1] or i == address_subset[0]: 106                             continue
107                         else: 108  address_store.append(str(i)) 109                 except ValueError: 110                     print('輸入有誤,請按"網絡號/掩碼"或"IP地址"格式輸入') 111  self.scan() 112 
113             elif ip_re_result: 114                 ii = re.sub(r'^0+', '', ip_re_result.group()) 115                 address_store.append(ii)      #對單個IP地址形式的輸入直接加表
116             else: 117                 print('輸入有誤,請輸入正確的IP地址(e.g:1.1.1.1,192.168.1.0/24)!!!') 118  self.get_ip(option) 119         return address_store 120 
121     # 通過從屏幕輸入獲取端口資源
122     # 輸入形式可以為單個端口號(例:3389),也可以是一個端口范圍(例:22-25)
123     # 返回數據為一個列表,其中每個元素都以元組形式存在. 每個元組包含兩個整數元素,第一個為端口范圍的最小值,第二個為端口范圍的最大值
124     # 注意: 單個端口號形式的輸入最后也將以范圍形式輸出,其最大值與最小值都為他本身
125     # 返回數據舉例: [(22-25),(3389,3389)]
126     def get_port(self): 127         port_range = input('請輸入要掃描端口范圍(e.g: 3389,20-25):') 128         target_port = []  # 端口資源存儲
129         try: 130             port_range = port_range.split(',')   # 例:['1', '2', '3-10', '11-20']
131             for i in port_range: 132                 if re.match(' *(\d+)-(\d+).*', i): 133                     # for ii in range(len(open_port_list)):
134                     low_port = int(re.match(' *(\d+)-(\d+).*', i).group(1)) 135                     high_port = int(re.match(' *(\d+)-(\d+).*', i).group(2)) 136                     if low_port >= high_port or low_port <= 0 or low_port > 65535 or high_port <= 0 or high_port > 65535: 137                         raise RangeException 138                     else: 139                         target_port.append((low_port, high_port))   # 如果是范圍則把最小值和最大值以元組形式加表
140                 elif re.match(' *\d+ *', i): 141                     singular = int(re.match(' *(\d+) *', i).group(1)) 142                     if 0 < singular <= 65535: 143                         target_port.append((singular, singular))   # 如果是單整數則把它當作范圍一樣處理,最大值和最小值均為它自己
144                     else: 145                         raise RangeException 146                 else: 147                     raise InputType 148         except RangeException: 149             print('端口應為1-65535之間的整數,且輸入范圍格式應當為從小到大') 150  self.get_port() 151         except InputType: 152             print('端口類型應為整數') 153  self.get_port() 154         except KeyboardInterrupt: 155             print('') 156  sys.exit() 157         except Exception as unusual: 158             print('輸入有誤!') 159             print(unusual) 160  self.get_port() 161         return target_port  # 返回經過處理的目標端口列表
162 
163     # 對純數字的列表進行排序且范圍切塊
164     # 例:導入[11,22,33,1,2,3,4,5]----->導出[1-5,11,22,33]
165  @staticmethod 166     def int_single_to_range(original): 167         original.sort()  # 先排序
168         open_port_range = [] 169         fun = lambda x: x[1] - x[0] 170         for k, g in groupby(enumerate(original), fun): 171             l1 = [j for i, j in g]  # 連續數字的列表
172             if len(l1) > 1: 173                 scop = str(min(l1)) + '-' + str(max(l1))  # 將連續數字范圍用"-"連接
174             else: 175                 scop = l1[0] 176             open_port_range.append("{}".format(scop)) 177         return open_port_range 178 
179     # TTL自動檢測
180     # 導入一個被探測設備IP列表,返回一個被探測設備IP與相應TTL的字典,例:{'220.2.2.2':15}
181     def ttl_check(self, address_list): 182         print('准備中...') 183         probe_device_ttl = {} 184         # switch = 0 # 檢測返回數據包的源IP是否為被探測設備
185         try: 186             for i in address_list: 187                 for ii in range(1, 129): 188                     print(i, ii) 189                     scan_packet = IP(dst=i, ttl=ii) / TCP(dport=8080, flags='S') 190                     ttl_source = sr1(scan_packet, timeout=3, verbose=False) 191                     #while 1:
192                     # time.sleep(0.001)
193                     if ttl_source: 194                         try: 195                             if ttl_source['IP'].fields['src'] == i: 196                                 probe_device_ttl[i] = ii 197                                 # switch = 1
198                                 break
199                             else: 200                                 continue
201                         except Exception as receive_error: 202                             print(receive_error) 203                             raise
204                     # if switch == 1:
205                     # break
206                 else: 207                     print('TTL超時!!!') 208 
209         except KeyboardInterrupt: 210             print('') 211  sys.exit() 212 
213         except Exception as error: 214             print('程序出現錯誤!!!') 215             print(error) 216  self.scan() 217         else: 218             print('准備完畢') 219             return probe_device_ttl 220 
221  @staticmethod 222     def option(): 223         print('請選擇導入被掃描信息方式:\n'
224               '1 從文件導入\n'
225               '2 在程序中手動輸入\n') 226 
227     def scan(self): 228         # 功能選擇
229  self.option() 230         try: 231             option = int(input('我選擇: ')) 232             print(option) 233             if option != 2 and option != 1: 234                 raise OptionError 235         except OptionError: 236             print('請輸入功能標號!') 237  self.scan() 238 
239         # 獲取要掃描IP列表
240         address_store = self.get_ip(option, '請輸入被探測IP資源:') 241 
242         # 獲取要掃描的端口列表
243         port_range = self.get_port() 244 
245         probe_device = self.get_ip(2, '請輸入被探測的安全設備IP地址:') 246 
247         # 自動檢測到探測設備的TTL值,該值為一個字典,key為被探測安全設備IP,value為到該設備的TTL值
248         ttl = self.ttl_check(probe_device) 249 
250         count = 0  # 用作進度百分比的分子. 以每個IP的每個端口為單位進行計數,總數為IP個數*端口個數
251 
252         if ttl: 253             # 挨個兒朝被探測設備發送端口探測包
254             for probe_device_ip, ttl in ttl.items(): 255 
256                 print(probe_device_ip + '端口開放情況:') 257 
258                 # 創建一個新文件,准備導入結果
259                 write_file = open(probe_device_ip + '-result.txt', 'w') 260 
261                 try: 262                     # 為每個被探測設備計算IP資源池中所有的IP資源
263                     for i in address_store: 264 
265                         # 為每個IP計算各個輸入IP端口范圍開放情況
266                         for port in port_range: 267                             (low_port, high_port) = port 268                             scan_packets = IP(dst=i, ttl=ttl) / TCP(dport=(low_port, high_port), flags='S')  # 構造檢測包
269                             replay_packets_total = sr(scan_packets, timeout=self.speed, verbose=False)  # 發送檢測包及接收返回包
270                             open_port_list = replay_packets_total[0].res  # 開放端口原始對象列表(一個IP不同端口范圍回包的集合)
271 
272                             # 一個IP有幾個端口開放就有幾個回包(如果端口被ACL干掉則不會回包),以下遍歷回包來讀取開放的端口
273                             for ii in range(len(open_port_list)): 274                                 try: 275                                     if open_port_list[ii][1]['ICMP'].fields['type'] == 11:  # ICMP類型為11時為TTL超時包
276                                         self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport'])  # TTL超時則為開放端口,將開放端口進行加表
277                                         continue
278                                     else: 279                                         if open_port_list[ii][1]['ICMP'].fields['type'] == 3:  # 不知為啥有時候會返回類型為3的ICMP包(即:端口不可達包)
280                                             continue
281                                         else: 282                                             # 除11和3外其他類型的ICMP回包,需進行人工排查
283                                             print('ICMP返回類型不對') 284                                             print(open_port_list[ii][1]['ICMP'].fields) 285                                             print(open_port_list[ii][0]['TCP'].fields) 286                                 except IndexError: 287 
288                                     # 如果探測設備IP剛好為要掃描的IP時,開放端口會返回SYN,ACK包
289                                     if open_port_list[ii][1]['TCP'].fields['flags'] == 'SA': 290                                         self.open_port.append(open_port_list[ii][0]['TCP'].fields['dport']) 291                                         continue
292 
293                                     # 不知為啥有時候交換機會返回RST ACK的包
294                                     if open_port_list[ii][1]['TCP'].fields['flags'] == 'RA': 295                                         continue
296                                     else: 297                                         print('返回未知TCP包,需人工分析') 298                                         print(open_port_list[ii][1]['TCP'].fields, 299                                               open_port_list[ii][1]['TCP'].fields['flags']) 300                                         print(open_port_list[ii]) 301 
302                                 count += 1  # 執行進度+1(每計算完一個IP進度+1)
303                                 print(count) 304 
305                                 # 進度統計
306                                 speed_to_progress = count / len(address_list) * len(port_range) * len(ttl) * 100
307                                 print('\r已完成:%.2f%% ' % speed_to_progress, end='') 308 
309                         self.open_port = self.int_single_to_range(self.open_port)  # 對開放端口列表進行排序和范圍化
310                         print('針對' + i + '開放端口: ', self.open_port) 311                         write_file.write(str(i) + ':' + str(self.open_port) + '\n')  # 每掃描完一個IP就把該IP結果寫入文件
312                         self.open_port = []  # 掃尾工作,為下個IP掃描准備一個干凈的開放端口列表
313 
314  write_file.close() 315 
316                 except KeyboardInterrupt: 317                     print('') 318  write_file.close() 319  sys.exit() 320                 except Exception as error: 321  write_file.close() 322                     print('程序異常退出!') 323                     print(error) 324                 else: 325  write_file.close() 326                     print('') 327                     if option == 1: 328                         print('被探測設備%s已完成,結果已導入當前路徑''\'%s\'''文件中' % (probe_device_ip, probe_device_ip + '-result.txt')) 329                     if option == 2: 330                         print('掃描已完成!') 331 
332 
333 if __name__ == '__main__': 334 
335     def banner(): 336         print('\n') 337         print('============================================') 338         print('\n') 339         print('\n') 340         print(' ACL有效性探測系統v1.0 ') 341         print('\n') 342         print('\n') 343         print('============================================') 344         print('\n') 345 
346     def main(): 347  banner() 348         a = PortScan() 349  a.scan() 350 
351     main()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM