總所周知,對於python而言實現tcp/udp的傳輸一般靠的是socket這個庫,而區分兩者的是建立socket的參數
SOCK_STREAM 為TCP連接
SOCK_DGRAM 為UDP連接
而一般情況下接收報文需要遵從某一些協議,這樣雙方可以通過特定的粘包解包操作處理數據。
很多情況自定義協議都是比較簡單,先接收報文頭,獲取消息長度,再獲取消息體。
但是有很多協議寫起來很麻煩,於是就用到scapy這個庫,就可以每次獲取一個報文,一般報文頭信息都是類似的
都帶有源地址,目標地址,消息長度,校驗碼
這里是一段監聽DMS報警系統的代碼
from scapy.all import * alarm_map = {"211": "未系安全帶", "205": "疲勞駕駛", "206": "疲勞駕駛", "208": "抽煙", "209": "出現異常"} def scan(target, port): has_no_connect = True while True: # 根據接口進行監聽報文 # 接口獲取由 IFACES 決定 try: # sniff開始獲取報文,iface是接口,filter可以選擇過濾報文類型, count為一次性獲取多少個pkt pkt = sniff(iface=IFACES.dev_from_index(12), filter="udp", count=1) # 每個pkt格式類型於YAML,如果沒有IP信息則代表這個包不完整 if pkt[0][IP].src == "192.168.43.1": if has_no_connect: # 展示建立連接的第一個UDP包 pkt[0].show() if Raw in pkt[0]: has_no_connect = False # load的值為bytes類型 body = pkt[0][Raw].load if b'alarm":2' in body: result = json.loads(body.decode()) if str(result.get('dms').get('alarm')) in alarm_map.keys(): now_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S') message = { "event_name": alarm_map[str(result.get('dms').get('alarm'))], "event_time": now_time, } print(message) else: print(result) except Exception as e: print(pkt[0]) continue """ 調用show()可以展示數據包,抓到第一個數據包格式如下: ###[ Ethernet ]### dst = 20:0d:b0:17:cf:d4 src = 02:08:22:b2:bb:fb type = IPv4 ###[ IP ]### version = 4 ihl = 5 tos = 0x0 len = 208 id = 19582 flags = DF frag = 0 ttl = 64 proto = udp chksum = 0x163a src = 192.168.43.1 dst = 192.168.43.19 \options \ ###[ UDP ]### sport = 23456 dport = 62472 len = 188 chksum = 0x202f ###[ Raw ]### load = '{"code":3,"dms":{"alarm":0,"num":1,"id":0,"eye":0,"p":15.220612,"y":-29.808479,"r":6.351517,"fr":0.493671,"fmi":0,"fmon":0,"fx":0.389583,"fy":0.201852,"fw":0.192708,"fh":0.457407}}' """
scapy這個庫確實能夠滿足一部分需求,不用頭疼於各種協議的實現。上手也比較簡單,在一些需要抓TCP包或者UDP的場景下使用效果極佳。
如果知道協議了,這里有段比較不錯的監聽tcp/udp ip報文的代碼。 【需要用管理員權限運行,不要在IDE下運行】
from __future__ import print_function import ctypes, sys import struct import socket import traceback ip_header_fmt = '!BBHHHBBH4s4s' tcp_header_fmt = '!HHLLBBHHH' udp_header_fmt = '!HHHH' IP_HEAD_LEN = struct.calcsize(ip_header_fmt) # 20字節 TCP_HEADER_LEN = struct.calcsize(tcp_header_fmt) # 20字節 UDP_HEAD_LEN = struct.calcsize(udp_header_fmt) # 8字節 IP_MAX_LEN = 65535 def is_admin(): try: return ctypes.windll.shell32.IsUserAnAdmin() except: return False def unpack_ip_header(buf): if len(buf) < IP_HEAD_LEN: return try: iph = struct.unpack(ip_header_fmt, buf[:IP_HEAD_LEN]) protocol_map = {1: 'ICMP', 6: 'TCP', 17: 'UDP'} return { 'version': iph[0] >> 4, # 高4位 'ihl': (iph[0] & 0xF) * 4, # 低4位,每個長度單位表示4字節,最大為60字節 'tos': iph[1], # 8位 'len': iph[2], # 16位 'id': iph[3], # 16位 'offset': iph[4], # 16位 'ttl': iph[5], # 8位 'protocol': protocol_map.get(iph[6], str(iph[6])), # 8位 'cks': iph[7], # 16位 'src': iph[8], # 32位 'dst': iph[9], # 32位 } except Exception as e: raise e def unpack_tcp_header(buf): if len(buf) < TCP_HEADER_LEN: return try: tcph = struct.unpack(tcp_header_fmt, buf[:TCP_HEADER_LEN]) return { 'src': tcph[0], # 16位 'dst': tcph[1], # 16位 'seq': tcph[2], # 32位 'ack': tcph[3], # 32位 'thl': (tcph[4] >> 4) * 4, # 高4位 'wlen': tcph[6], # 16位 } except Exception as e: raise e def unpack_udp_header(buf): if len(buf) < UDP_HEAD_LEN: return try: udph = struct.unpack(udp_header_fmt, buf[:UDP_HEAD_LEN]) return { 'src': udph[0], # 16位 'dst': udph[1], # 16位 'dlen': udph[2], # 16位 'cks': udph[3], # 16位 } except Exception as e: raise e def get_host_ip(): try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect(('8.8.8.8', 80)) ip = sock.getsockname()[0] except Exception: pass finally: sock.close() return ip if __name__ == '__main__': if sys.version_info[0] == 3: ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, __file__, None, 1) host = get_host_ip() # 獲取當前聯網網卡的ip地址 try: sock = socket.socket( socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP) sock.bind((host, 0)) sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) while True: buf = sock.recvfrom(IP_MAX_LEN)[0] if len(buf) < IP_HEAD_LEN: continue ipheader = unpack_ip_header(buf[:IP_HEAD_LEN]) if ipheader: buf = buf[ipheader['ihl']:] if ipheader['protocol'] == 'TCP': tcpheader = unpack_tcp_header(buf[:TCP_HEADER_LEN]) if tcpheader: data = buf[tcpheader['thl']:( ipheader['len'] - ipheader['ihl'])] if data: print('TCP {}'.format(data)) elif ipheader['protocol'] == 'UDP': udpheader = unpack_udp_header(buf[:UDP_HEAD_LEN]) if udpheader: data = buf[UDP_HEAD_LEN:][:udpheader['dlen']] if data: print('UDP {}'.format(data)) # 可以增加其他協議的頭解析 except Exception as e: traceback.print_exc() finally: sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF) sock.close()