通過pcap與dpkt抓包解包示例:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ 網絡數據包捕獲與分析程序 """ import pcap import dpkt import json import re import time from urllib import unquote # 過濾輸出目標ip dst_lists = [ '203.66.1.212', # nslookup dpdcs.4399sy.com.hk '52.74.10.186', # nslookup dpdcs.4399en.com '52.58.69.212', # nslookup dpdcs.4399sy.ru '220.241.11.3', # nslookup dpdcs.4399th.com '124.243.195.63', # nslookup sdkdcs.4399sy.com '42.62.106.216', # nslookup udpdcs.4399sy.com '42.62.106.230', # nslookup udpdcs.4399sy.com ] req_data = "" times = 0 def capt_data(eth_name="eth0", p_type=None): """ 捕獲網卡數據包 :param eth_name 網卡名,eg. eth0,eth3... :param p_type 日志捕獲類型 1:sdk日志用例分析 2:目標域名過濾輸出 3:原始數據包 :return: """ pc = pcap.pcap(eth_name) pc.setfilter('tcp port 80') # 設置監聽過濾器 print 'start capture....' if pc: for p_time, p_data in pc: # p_time為收到時間,p_data為收到數據 anly_capt(p_time, p_data, p_type) def anly_capt(p_time, p_data, p_type): """ 解析數據包 :param p_data 收到數據 :param p_type 日志捕獲類型 1:sdk日志用例分析 2:目標域名過濾輸出 3:原始數據包 :return: """ p = dpkt.ethernet.Ethernet(p_data) if p.data.__class__.__name__ == 'IP': ip_data = p.data src_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.src))) dst_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.dst))) if p.data.data.__class__.__name__ == 'TCP': tcp_data = p.data.data if tcp_data.dport == 80: # print tcp_data.data if tcp_data.data: # 調用日志模塊,對日志進行處理 if p_type == 1: # sdk日志用例分析 if dst_ip in dst_lists: tmp = tcp_data.data.strip() global req_data, times if tmp.startswith("POST") or tmp.startswith("GET"): # or times > 0 if req_data: haiwai_log_case(req_data) req_data = tmp + "\n" # times = 0 else: req_data = req_data + tmp # times = times + 1 elif p_type == 2: # 目標域名過濾輸出 if dst_ip in dst_lists: print "tcp_data:", tcp_data.data else: # 無過濾條件輸出 print "tcp_data:", tcp_data.data # android 日誌類型,從data中獲取 log_type_from_data = { 'open_game': u'[打開游戲]', 'network_check': u'[網絡監測]', 'open_login': u'[登錄界面前]', 'select_server': u'[選服日志]', 'create_role': u'[創角日志]', 'role_level_change': u'[等級日志]', # 海外,俄語 'activity_open': u'[打開游戲]', 'load_start_before_login': u'[加載開始]', 'load_finish_before_login': u'[加載結束]', 'activity_before_login': u'[登錄界面前]', 'click_enter': u'[進入游戲]', 'get_user_server_login': u'[選服日志]', 'user_create_role': u'[創角日志]', 'role_login': u'[角色登錄]', 'enter_success': u'[成功進入游戲]', 'role_level': u'[等級日志]', 'user_online': u'[在線日志]', 'exit_success': u'[退出游戲]', } # ios日誌類型,從請求資源路徑獲取 log_type_from_path = { 'activity_open.php': u'[打開游戲]', 'load_start_before_login.php': u'[加載開始]', 'load_finish_before_login.php': u'[加載結束]', 'activity_before_login.php': u'[登錄界面前]', 'click_enter.php': u'[進入游戲]', 'get_user_server_login.php': u'[選服日志]', 'user_create_role.php': u'[創角日志]', 'role_login.php': u'[角色登錄]', 'enter_success.php': u'[成功進入游戲]', 'user_online.php': u'[在線日志]', 'role_level.php': u'[等級日志]', 'exit_success.php': u'[退出游戲]', 'share.php': u'[分享日志]', 'init_info.php': u'[初始化日志]', 'event.php': u'[事件日志]', 'user_login.php': u'[user_login]', 'user_server_login.php': u'[user_server_login]', 'enter_game.php': u'[enter_game]', } # 過濾path filter_out_list = [ 'u/', 'plugin/error/check', 'service/version/get_info', ] # 過濾打印出屬於列表中的host的日志。 host_list = [ 'dpdcs.4399sy.com.hk', 'dpdcs.4399en.com', 'dpdcs.4399sy.ru', 'dpdcs.4399th.com', 'sdkdcs.4399sy.com', 'udpdcs.4399sy.com', ] def formattime(t): # 日期字段格式化 return time.strftime('%c', time.gmtime(t + 8 * 3600)) def req_to_dict(req_string): """ 將請求數據轉換為dic :param req_string: :return: """ req_dict = {} req_string = req_string.strip() if len(req_string) > 0: req_string = unquote(req_string) # print "req_string_after_unquote:",req_string m1 = re.search("(GET|POST)(.*)\?(.*)HTTP/1.1", req_string) # (method,path,param) m2 = re.search("Host:(.*)", req_string) # (host,) # m3 = re.search("\sdata=(.*)\s", req_string) # (body,) m4 = re.search("\sdata=([\s\S]*)", req_string) # (body,) # m5 = re.search("eventTime\":\"(\d+)", req_string) # (eventTime,) m5 = re.search("eventTime\"\s*:\s*\"(\d+)", req_string) if m1: req_dict["method"] = m1.group(1).strip() req_dict["path"] = m1.group(2).strip()[1:] param_string = m1.group(3).strip() if param_string: param_string = param_string.split("&") param_dict = {} for item in param_string: tmp_list = item.split("=") if len(tmp_list) > 1: param_dict[tmp_list[0]] = tmp_list[1] req_dict["param"] = param_dict if m2: req_dict["host"] = m2.group(1).strip() if m4: try: body = m4.group(1).replace("\n", "") body = json.loads(body) except ValueError: print "\033[1;31;40m" print "m4:Error:body ValueError,req_string-->%s" % req_string print "\033[0m" body = {} req_dict["body"] = body if m5: req_dict["eventTime"] = formattime(int(m5.group(1))) return req_dict def haiwai_log_assert(req_dict): """ 日志斷言處理,輸出分析結果 :param req_dict: :return: """ # 從 data 中獲取日志類型 if isinstance(req_dict, dict) and req_dict.get("body"): if req_dict.get("body").get("data"): data_type = req_dict.get("body").get("data").keys() data_type_set = set(data_type) types_key_set = set(log_type_from_data.keys()) intersect = data_type_set.intersection(types_key_set) if intersect: log_type = intersect.pop() print "\033[1;31;40m %s log pass!--from body data || EventTime:-->[%s] \033[0m" % ( log_type_from_data.get(log_type), req_dict.get("eventTime")) print req_dict else: if 'common' in data_type and len(data_type) == 2: data_type.remove('common') print "\033[1;31;40m %s log not register!--from body data \033[0m" % data_type # 從 path 中獲取日志類型 path = req_dict.get("path") host = req_dict.get("host") if host in host_list: if path in log_type_from_path.keys(): eventTime = "" if req_dict.get("eventTime"): eventTime = req_dict.get("eventTime") else: if req_dict.get("param"): eventTime = req_dict.get("param").get("time") if eventTime: eventTime = formattime(int(eventTime)) print "\033[1;31;40m %s log pass--from url || EventTime:-->[%s] \033[0m" % ( log_type_from_path.get(path), eventTime) print req_dict elif path and path not in filter_out_list: print "\033[1;31;40m %s log not register!--from url! \033[0m" % path print req_dict def client_log_check(log_type, req_dict, platform="sy"): """ 檢查SDK客戶端請求字段,返回測試結果集 :param log_type: 日志類型 :param req_dict: 日志字典 :param platform: 測試平台 :return: """ pass def haiwai_log_case(req_string): """ 日志用例集 一:將數據包轉換為dict 二:對日志分析處理,輸出測試結果 """ req_dict = req_to_dict(req_string) haiwai_log_assert(req_dict) if __name__ == '__main__': try: capt_data("eth3", 1) except TypeError: capt_data("eth3", 1)
