1. 簡介
很多童鞋都會有這樣一個需求,我想要掃描特定網段並需要知道未使用和已使用的IP有哪些,甚至需要將其做統計,那這時候用Python去實現IP段掃描就會比較的輕松,當前文中我是將數據保存到mongo中,這里的代碼只做參考,需要根據實際的場景進行修改!!
2. 代碼實現
import time
import IPy
from concurrent.futures import ThreadPoolExecutor
import subprocess
from pymongo import MongoClient
class ip_check(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 數據庫相關
self.client = MongoClient('127.0.0.1', 27017)
self.db = self.client['devops']
# self.save_dbname = "snmp_ping_result" # 存儲歷史記錄
self.store_db = 'snmp_ping'
# 存放IP地址
self.ip_allocated = [] # 已分配/已使用的IP
self.ip_not_allocated = [] # 未分配/未使用的IP
self.ip_error = [] # 訪問失敗或異常的IP
# icmp相關(ping底層用的就是icmp協議)
self.icmp_timeout = 5 # 超時時間
self.icmp_count = 5 # 發送次數
# 並發相關
self.is_thread = True # 是否多線程(建議開啟,否則檢測速度異常的慢)
self.max_thread_workers = 100 # 最大線程池數
def run(self):
self.get_result()
def get_result(self):
# 存放最終返回的結果
# result = {}
# 獲取網段地址
segment = '172.16.58.0/24'
db = self.db[self.store_db]
# todo 從數據庫中查詢對應segment的數據,但是這里注意,如果不需要指定 segment 來查詢的話,可以直接查詢到所有IP數據,然后通過for循環的方式一個一個網段的ping
data = db.find_one({'segment': segment})
if data is None:
raise Exception('網段不存在')
_ips = IPy.IP(data.get('segment')) # IPy 獲取所有IP地址
# todo 獲取該網段已使用的IP 一般這里是從數據庫中查詢
exist_ip = data.get('used_ip')
# 過濾已存在的IP
ips = [str(ip) for ip in _ips if str(ip) not in exist_ip]
"""並發執行ping"""
max_workers = self.max_thread_workers if self.max_thread_workers else 20
if self.is_thread:
with ThreadPoolExecutor(max_workers) as executor:
for ip in ips:
executor.submit(self.send_ping, ip)
else:
for ip in ips:
self.send_ping(ip)
"""統計IP數量"""
# 只有當 總ip數量 = 已使用IP + 剩余IP + 異常IP 才能確保所有IP都被掃描到了
if len(ips) == (len(self.ip_allocated) + len(self.ip_not_allocated) + len(self.ip_error)):
total_ip, used_ip, available_ip, err_ip = len(ips), len(self.ip_allocated), len(self.ip_not_allocated), len(
self.ip_error)
print("""
總IP:%s
已使用IP: %s
剩余IP:%s
故障的IP:%s
""" % (total_ip, used_ip, available_ip, err_ip))
else:
# todo 之后會增加日志記錄或者是報異常
...
# todo 最后需要將結果存儲到數據庫中
save_result = {
'$set': {
'total_ip': len(ips),
'used_ip': self.ip_allocated,
'available_ip': self.ip_not_allocated,
'err_ip': self.ip_error,
}
}
db.update_one(data, save_result)
"""處理異常的IP"""
# todo 保留,可以對異常的IP再次發起二次ping或者是其他處理
def send_ping(self, ip):
"""
發送ping命令,執行IP掃描
:param ip: 字符串形式的IP地址,例如:'172.16.2.12'
:return:
"""
if ip:
try:
pipe = subprocess.run(f"ping -c {self.icmp_count} -t {self.icmp_timeout} {ip} > /dev/null 2>&1",
shell=True)
code = pipe.returncode # 獲取執行狀態碼 ping通:返回0,ping不通:返回 2(非0)
if code == 0:
self.ip_allocated.append(ip) # 已使用的IP
elif code != 0 or code == 2:
self.ip_not_allocated.append(ip) # 未使用的IP
except Exception as e:
# icmp不可達的記錄為異常IP
print(f'ip:{ip} 訪問異常: {e}')
self.ip_error.append(ip) # 異常IP
else:
# todo 之后會增加日志記錄或者是報異常
...
if __name__ == "__main__":
x = ip_check()
x.run()
數據庫表結構: