Python 實現IP段掃描


1. 簡介

很多童鞋都會有這樣一個需求,我想要掃描特定網段並需要知道未使用和已使用的IP有哪些,甚至需要將其做統計,那這時候用Python去實現IP段掃描就會比較的輕松,當前文中我是將數據保存到mongo中,這里的代碼只做參考,需要根據實際的場景進行修改!!

2. 代碼實現

import time
import IPy
from concurrent.futures import ThreadPoolExecutor
import subprocess
from pymongo import MongoClient


class ip_check(object):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # 數據庫相關
        self.client = MongoClient('127.0.0.1', 27017)
        self.db = self.client['devops']

        # self.save_dbname = "snmp_ping_result"  # 存儲歷史記錄
        self.store_db = 'snmp_ping'
        # 存放IP地址
        self.ip_allocated = []  # 已分配/已使用的IP
        self.ip_not_allocated = []  # 未分配/未使用的IP
        self.ip_error = []  # 訪問失敗或異常的IP

        # icmp相關(ping底層用的就是icmp協議)
        self.icmp_timeout = 5  # 超時時間
        self.icmp_count = 5  # 發送次數

        # 並發相關
        self.is_thread = True  # 是否多線程(建議開啟,否則檢測速度異常的慢)
        self.max_thread_workers = 100  # 最大線程池數

    def run(self):
        self.get_result()

    def get_result(self):
        # 存放最終返回的結果
        # result = {}

        # 獲取網段地址
        segment = '172.16.58.0/24'
        db = self.db[self.store_db]
        # todo 從數據庫中查詢對應segment的數據,但是這里注意,如果不需要指定 segment 來查詢的話,可以直接查詢到所有IP數據,然后通過for循環的方式一個一個網段的ping
        data = db.find_one({'segment': segment})
        if data is None:
            raise Exception('網段不存在')

        _ips = IPy.IP(data.get('segment'))  # IPy 獲取所有IP地址

        # todo 獲取該網段已使用的IP 一般這里是從數據庫中查詢
        exist_ip = data.get('used_ip')

        # 過濾已存在的IP
        ips = [str(ip) for ip in _ips if str(ip) not in exist_ip]

        """並發執行ping"""
        max_workers = self.max_thread_workers if self.max_thread_workers else 20
        if self.is_thread:
            with ThreadPoolExecutor(max_workers) as executor:
                for ip in ips:
                    executor.submit(self.send_ping, ip)
        else:
            for ip in ips:
                self.send_ping(ip)
                
        """統計IP數量"""
        # 只有當 總ip數量 = 已使用IP + 剩余IP + 異常IP 才能確保所有IP都被掃描到了
        if len(ips) == (len(self.ip_allocated) + len(self.ip_not_allocated) + len(self.ip_error)):
            total_ip, used_ip, available_ip, err_ip = len(ips), len(self.ip_allocated), len(self.ip_not_allocated), len(
                self.ip_error)

            print("""
                 總IP:%s
                 已使用IP: %s
                 剩余IP:%s
                 故障的IP:%s
                 """ % (total_ip, used_ip, available_ip, err_ip))

        else:
            # todo 之后會增加日志記錄或者是報異常
            ...

        # todo 最后需要將結果存儲到數據庫中
        save_result = {
            '$set': {
                'total_ip': len(ips),
                'used_ip': self.ip_allocated,
                'available_ip': self.ip_not_allocated,
                'err_ip': self.ip_error,
            }
        }
        db.update_one(data, save_result)

        """處理異常的IP"""
        # todo 保留,可以對異常的IP再次發起二次ping或者是其他處理

    def send_ping(self, ip):
        """
        發送ping命令,執行IP掃描
        :param ip: 字符串形式的IP地址,例如:'172.16.2.12'
        :return:
        """
        if ip:
            try:
                pipe = subprocess.run(f"ping -c {self.icmp_count} -t {self.icmp_timeout} {ip} > /dev/null 2>&1",
                                      shell=True)

                code = pipe.returncode  # 獲取執行狀態碼 ping通:返回0,ping不通:返回 2(非0)
                if code == 0:
                    self.ip_allocated.append(ip)  # 已使用的IP
                elif code != 0 or code == 2:
                    self.ip_not_allocated.append(ip)  # 未使用的IP

            except Exception as e:
                # icmp不可達的記錄為異常IP
                print(f'ip:{ip} 訪問異常: {e}')
                self.ip_error.append(ip)  # 異常IP
        else:
            # todo 之后會增加日志記錄或者是報異常
            ...


if __name__ == "__main__":
    x = ip_check()
    x.run()

數據庫表結構:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM