1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*- 3 4 """ 網絡數據包捕獲與分析程序 """ 5 6 import pcap 7 import dpkt 8 import json 9 import re 10 import time 11 from urllib import unquote 12 13 # 過濾輸出目標ip 14 dst_lists = [ 15 '203.66.1.212', # nslookup dpdcs.4399sy.com.hk 16 '52.74.10.186', # nslookup dpdcs.4399en.com 17 '52.58.69.212', # nslookup dpdcs.4399sy.ru 18 '220.241.11.3', # nslookup dpdcs.4399th.com 19 '124.243.195.63', # nslookup sdkdcs.4399sy.com 20 '42.62.106.216', # nslookup udpdcs.4399sy.com 21 '42.62.106.230', # nslookup udpdcs.4399sy.com 22 ] 23 24 req_data = "" 25 times = 0 26 27 28 def capt_data(eth_name="eth0", p_type=None): 29 """ 30 捕獲網卡數據包 31 :param eth_name 網卡名,eg. eth0,eth3... 32 :param p_type 日志捕獲類型 1:sdk日志用例分析 2:目標域名過濾輸出 3:原始數據包 33 :return: 34 """ 35 36 pc = pcap.pcap(eth_name) 37 pc.setfilter('tcp port 80') # 設置監聽過濾器 38 print 'start capture....' 39 if pc: 40 for p_time, p_data in pc: # p_time為收到時間,p_data為收到數據 41 anly_capt(p_time, p_data, p_type) 42 43 44 def anly_capt(p_time, p_data, p_type): 45 """ 46 解析數據包 47 :param p_data 收到數據 48 :param p_type 日志捕獲類型 1:sdk日志用例分析 2:目標域名過濾輸出 3:原始數據包 49 :return: 50 """ 51 52 p = dpkt.ethernet.Ethernet(p_data) 53 if p.data.__class__.__name__ == 'IP': 54 ip_data = p.data 55 src_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.src))) 56 dst_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.dst))) 57 if p.data.data.__class__.__name__ == 'TCP': 58 tcp_data = p.data.data 59 if tcp_data.dport == 80: 60 # print tcp_data.data 61 if tcp_data.data: 62 # 調用日志模塊,對日志進行處理 63 if p_type == 1: 64 # sdk日志用例分析 65 if dst_ip in dst_lists: 66 tmp = tcp_data.data.strip() 67 global req_data, times 68 if tmp.startswith("POST") or tmp.startswith("GET"): # or times > 0 69 if req_data: 70 haiwai_log_case(req_data) 71 req_data = tmp + "\n" 72 # times = 0 73 else: 74 req_data = req_data + tmp 75 # times = times + 1 76 77 elif p_type == 2: 78 # 目標域名過濾輸出 79 if dst_ip in dst_lists: 80 print "tcp_data:", tcp_data.data 81 82 else: 83 # 無過濾條件輸出 84 print "tcp_data:", tcp_data.data 85 86 87 # android 日誌類型,從data中獲取 88 log_type_from_data = { 89 90 'open_game': u'[打開游戲]', 91 'network_check': u'[網絡監測]', 92 'open_login': u'[登錄界面前]', 93 'select_server': u'[選服日志]', 94 'create_role': u'[創角日志]', 95 'role_level_change': u'[等級日志]', 96 97 # 海外,俄語 98 'activity_open': u'[打開游戲]', 99 'load_start_before_login': u'[加載開始]', 100 'load_finish_before_login': u'[加載結束]', 101 'activity_before_login': u'[登錄界面前]', 102 'click_enter': u'[進入游戲]', 103 'get_user_server_login': u'[選服日志]', 104 'user_create_role': u'[創角日志]', 105 'role_login': u'[角色登錄]', 106 'enter_success': u'[成功進入游戲]', 107 'role_level': u'[等級日志]', 108 'user_online': u'[在線日志]', 109 'exit_success': u'[退出游戲]', 110 111 } 112 113 # ios日誌類型,從請求資源路徑獲取 114 log_type_from_path = { 115 'activity_open.php': u'[打開游戲]', 116 'load_start_before_login.php': u'[加載開始]', 117 'load_finish_before_login.php': u'[加載結束]', 118 'activity_before_login.php': u'[登錄界面前]', 119 'click_enter.php': u'[進入游戲]', 120 'get_user_server_login.php': u'[選服日志]', 121 'user_create_role.php': u'[創角日志]', 122 'role_login.php': u'[角色登錄]', 123 'enter_success.php': u'[成功進入游戲]', 124 'user_online.php': u'[在線日志]', 125 'role_level.php': u'[等級日志]', 126 'exit_success.php': u'[退出游戲]', 127 'share.php': u'[分享日志]', 128 'init_info.php': u'[初始化日志]', 129 'event.php': u'[事件日志]', 130 'user_login.php': u'[user_login]', 131 'user_server_login.php': u'[user_server_login]', 132 'enter_game.php': u'[enter_game]', 133 } 134 135 # 過濾path 136 filter_out_list = [ 137 'u/', 138 'plugin/error/check', 139 'service/version/get_info', 140 ] 141 142 # 過濾打印出屬於列表中的host的日志。 143 host_list = [ 144 'dpdcs.4399sy.com.hk', 145 'dpdcs.4399en.com', 146 'dpdcs.4399sy.ru', 147 'dpdcs.4399th.com', 148 'sdkdcs.4399sy.com', 149 'udpdcs.4399sy.com', 150 ] 151 152 153 def formattime(t): # 日期字段格式化 154 return time.strftime('%c', time.gmtime(t + 8 * 3600)) 155 156 157 def req_to_dict(req_string): 158 """ 159 將請求數據轉換為dic 160 :param req_string: 161 :return: 162 """ 163 req_dict = {} 164 req_string = req_string.strip() 165 if len(req_string) > 0: 166 req_string = unquote(req_string) 167 # print "req_string_after_unquote:",req_string 168 m1 = re.search("(GET|POST)(.*)\?(.*)HTTP/1.1", req_string) # (method,path,param) 169 m2 = re.search("Host:(.*)", req_string) # (host,) 170 # m3 = re.search("\sdata=(.*)\s", req_string) # (body,) 171 m4 = re.search("\sdata=([\s\S]*)", req_string) # (body,) 172 # m5 = re.search("eventTime\":\"(\d+)", req_string) # (eventTime,) 173 m5 = re.search("eventTime\"\s*:\s*\"(\d+)", req_string) 174 if m1: 175 req_dict["method"] = m1.group(1).strip() 176 req_dict["path"] = m1.group(2).strip()[1:] 177 param_string = m1.group(3).strip() 178 if param_string: 179 param_string = param_string.split("&") 180 param_dict = {} 181 for item in param_string: 182 tmp_list = item.split("=") 183 if len(tmp_list) > 1: 184 param_dict[tmp_list[0]] = tmp_list[1] 185 req_dict["param"] = param_dict 186 if m2: 187 req_dict["host"] = m2.group(1).strip() 188 if m4: 189 try: 190 body = m4.group(1).replace("\n", "") 191 body = json.loads(body) 192 except ValueError: 193 print "\033[1;31;40m" 194 print "m4:Error:body ValueError,req_string-->%s" % req_string 195 print "\033[0m" 196 body = {} 197 req_dict["body"] = body 198 199 if m5: 200 req_dict["eventTime"] = formattime(int(m5.group(1))) 201 202 return req_dict 203 204 205 def haiwai_log_assert(req_dict): 206 """ 207 日志斷言處理,輸出分析結果 208 :param req_dict: 209 :return: 210 """ 211 212 # 從 data 中獲取日志類型 213 if isinstance(req_dict, dict) and req_dict.get("body"): 214 if req_dict.get("body").get("data"): 215 data_type = req_dict.get("body").get("data").keys() 216 data_type_set = set(data_type) 217 types_key_set = set(log_type_from_data.keys()) 218 intersect = data_type_set.intersection(types_key_set) 219 if intersect: 220 log_type = intersect.pop() 221 print "\033[1;31;40m %s log pass!--from body data || EventTime:-->[%s] \033[0m" % ( 222 log_type_from_data.get(log_type), req_dict.get("eventTime")) 223 print req_dict 224 else: 225 if 'common' in data_type and len(data_type) == 2: 226 data_type.remove('common') 227 print "\033[1;31;40m %s log not register!--from body data \033[0m" % data_type 228 229 # 從 path 中獲取日志類型 230 path = req_dict.get("path") 231 host = req_dict.get("host") 232 if host in host_list: 233 if path in log_type_from_path.keys(): 234 eventTime = "" 235 if req_dict.get("eventTime"): 236 eventTime = req_dict.get("eventTime") 237 else: 238 if req_dict.get("param"): 239 eventTime = req_dict.get("param").get("time") 240 if eventTime: 241 eventTime = formattime(int(eventTime)) 242 print "\033[1;31;40m %s log pass--from url || EventTime:-->[%s] \033[0m" % ( 243 log_type_from_path.get(path), eventTime) 244 print req_dict 245 elif path and path not in filter_out_list: 246 print "\033[1;31;40m %s log not register!--from url! \033[0m" % path 247 print req_dict 248 249 250 def client_log_check(log_type, req_dict, platform="sy"): 251 """ 252 檢查SDK客戶端請求字段,返回測試結果集 253 :param log_type: 日志類型 254 :param req_dict: 日志字典 255 :param platform: 測試平台 256 :return: 257 """ 258 pass 259 260 261 def haiwai_log_case(req_string): 262 """ 263 日志用例集 264 一:將數據包轉換為dict 265 二:對日志分析處理,輸出測試結果 266 """ 267 268 req_dict = req_to_dict(req_string) 269 haiwai_log_assert(req_dict) 270 271 272 if __name__ == '__main__': 273 try: 274 capt_data("eth3", 1) 275 except TypeError: 276 capt_data("eth3", 1)