一、背景:
最近工作中做了一個小功能,目的是為了分析注冊用戶區域分布和訂單的區域分布情況。所以需要將其對應的IP信息解析為歸屬地,並同步每天同步更新。
線上跑起來效率還是有優化的空間,優化的方向:在調用IP查詢API過程可以調整為多線程並行解析IP。后續會更新這方便的調整。
技術: Pyhton3
postgreSQL
env配置文件
附加信息:iP地址查詢(iP138官方企業版):https://market.aliyun.com/products/56928004/cmapi015606.html#sku=yuncode960600002
.可提供免費的IP查詢API.
二、實現思路: 1、 讀取數據庫IP信息
2、 調用第三方IP解析API進行解析
3、 將解析歸屬地信息存入數據庫
三、幾點說明: 1、環境信息等參數配置
2、日志輸出
3、異常處理: 數據庫連接異常
請求連接查詢IP的URL異常:HTTP ERROR 503
4、json,字典,數組等類型數據輸入輸出
5、分頁查詢並批量解析
5.功能實現很簡單,所以就沒有做詳細的介紹了。詳情可直接看完整代碼,有詳細的備注。
四、步驟簡單介紹:
針對實現思路的3個步驟寫了3個函數,彼此調用執行。
函數:
def get_ip_info(table_name):
def get_ip_area(table_name):
def ip_write_db(table_name):
調用:
ip_write_db("h_user_stat")
五、關鍵代碼說明:
語法:urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
# 對從數據庫表中出查詢的IP進行解析 querys = 'callback&datatype=jsonp&ip=' + get_ip bodys = {} url = host + path + '?' + querys request = urllib.request.Request(url) request.add_header('Authorization', 'APPCODE ' + appcode) # 連接url時可能會出現 ERROR: HTTP Error 503: Service Unavailable try: response = urllib.request.urlopen(request) except Exception as e: logging.error(e) # 輸出異常日志信息 time.sleep(5) response = urllib.request.urlopen(request) finally: content = response.read() ip_area = content.decode('utf8') ip_area = json.loads(ip_area)['data'] # json類型轉字典類型並取'data'健值 arr.append([get_ip, ip_area]) # 將結果集存於二元數組
說明:從數據庫分頁查詢固定數量的IP存入數組,並遍歷該數組並將解析后的地區信息data健值存於二元數組中。
六、Python代碼實現如下:
1 # 導入psycopg2包 2 import psycopg2, time,datetime,sys 3 import json 4 import urllib, urllib.request 5 import os 6 import configparser 7 import logging 8 # purpose: 連接數據庫讀取表IP信息 9 def get_ip_info(table_name): 10 # 全局變量作用局部作用域 11 global pagesize # 每頁查詢數據條數 12 global rows_count 13 14 # 測試1 15 starttime_1 = time.time() 16 # 建立游標,用來執行數據庫操作 17 cur = conn.cursor() 18 # 執行SQL命令 19 cur.execute("SELECT remote_ip FROM (select remote_ip,min(created_at) from " + table_name + " group by remote_ip) h1 where remote_ip is not null and remote_ip <> '' and not exists (select 1 from d_ip_area_mapping h2 where h1.remote_ip = h2.remote_ip) limit " + str(pagesize) + ";") 20 21 22 # 獲取結果集條數 23 rows_count = cur.rowcount 24 25 # print('解析用戶IP的總數:' + str(rows_count)) 26 27 # 當有未解析的用戶的IP,返回元組,否則退出程序 28 if rows_count > 0: 29 # 獲取SELECT返回的元組 30 rows = cur.fetchall() # all rows in table 31 32 for row in rows: 33 tuple = rows 34 35 conn.commit() 36 # 關閉游標 37 cur.close() 38 39 else: 40 tuple = [] 41 logging.info('每頁查詢秒數:' + str(time.time() - starttime_1)) 42 return tuple 43 # 調用解析函數 44 45 46 def get_ip_area(table_name): 47 # 內包含用戶ID和IP的數組的元組 48 tuple = get_ip_info(table_name) 49 50 # 測試2 51 starttime_2 = time.time() 52 host = 'http://ali.ip138.com' 53 path = '/ip/' 54 method = 'GET' 55 appcode = '917058e6d7c84104b7cab9819de54b6e' 56 arr = [] 57 for row in tuple: 58 59 get_ip = row[0] 60 #get_user = "".join(str(row)) 61 #get_user = row[0] 62 63 # 對從數據庫表中出查詢的IP進行解析 64 querys = 'callback&datatype=jsonp&ip=' + get_ip 65 bodys = {} 66 url = host + path + '?' + querys 67 request = urllib.request.Request(url) 68 request.add_header('Authorization', 'APPCODE ' + appcode) 69 70 # 連接url時可能會出現 ERROR: HTTP Error 503: Service Unavailable 71 try: 72 response = urllib.request.urlopen(request) 73 except Exception as e: 74 logging.error(e) # 輸出異常日志信息 75 time.sleep(5) 76 response = urllib.request.urlopen(request) 77 finally: 78 content = response.read() 79 ip_area = content.decode('utf8') 80 ip_area = json.loads(ip_area)['data'] # json類型轉字典類型並取'data'健值 81 arr.append([get_ip, ip_area]) # 將結果集存於二元數組 82 logging.info('每頁解析秒數:' + str(time.time() - starttime_2)) 83 return arr 84 85 86 def ip_write_db(table_name): 87 88 write_ip = get_ip_area(table_name) # 內包含用戶ID和IP的數組的元組 89 90 91 # 測試1 92 starttime_3 = time.time() 93 94 # 建立游標,用來執行數據庫操作 95 cur = conn.cursor() 96 for row in write_ip: 97 # get_user = row[0] # 獲取用戶ID 98 get_ip = row[0] # 獲取用戶對應的IP 99 country = row[1][0] # 獲取IP解析后的地區:國家 100 province = row[1][1] # 獲取IP解析后的地區:省 101 city = row[1][2] # 獲取IP解析后的地區:市 102 isp = row[1][3] # 獲取IP解析后的服務提供商 103 104 # 執行SQL命令 105 sql = "insert into d_ip_area_mapping(remote_ip,country,province,city,isp,created_at,updated_at,job_id) values (%s,%s,%s,%s,%s,%s,%s,%s);" 106 val = [get_ip, country, province, city, isp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 107 time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),time.strftime("%Y-%m-%d",time.localtime())] 108 109 cur.execute(sql, val) 110 conn.commit() 111 # 關閉游標 112 cur.close() 113 logging.info('每頁插入秒數:' + str(time.time() - starttime_3)) 114 115 116 # 1.程序開始執行計時 117 starttime = time.time() 118 119 120 # 讀取配置文件環境信息 121 122 # 項目路徑 123 rootDir = os.path.split(os.path.realpath(__file__))[0] 124 125 126 ############################### config.env文件路徑 ############################################################# 127 128 configFilePath = os.path.join(rootDir, 'db_udw.env') 129 config = configparser.ConfigParser() 130 config.read(configFilePath) 131 132 # 讀取數據庫環境信息 133 db_database = config.get('postgresql','database') 134 db_user = config.get('postgresql','user') 135 db_password = config.get('postgresql','password') 136 db_host = config.get('postgresql','host') 137 db_port = config.get('postgresql','port') 138 139 # 讀取輸出日志路徑 140 log = config.get('log','log_path') 141 142 # 每頁查詢數據條數 143 pagesize = config.get('page','pagesize') 144 145 # 讀取解析IP條數限制 146 ip_num_limit = config.get('ip_num','ip_num_limit') 147 148 # 配置輸出日志格式 149 logging.basicConfig(level=logging.DEBUG,#控制台打印的日志級別 150 filename='{my_log_path}/ip_analyzer.log'.format(my_log_path=log), # 指定日志文件及路徑 151 filemode='a',##模式,有w和a,w就是寫模式,每次都會重新寫日志,覆蓋之前的日志 #a是追加模式,默認如果不寫的話,就是追加模式 152 format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'#日志格式 153 ) 154 155 ############################### 程序開始執行 ############################################################# 156 try: 157 158 # 連接到一個給定的數據庫 159 conn = psycopg2.connect(database=db_database, user=db_user, password=db_password, host=db_host, port=db_port) 160 except Exception as e: 161 logging.error(e) # 輸出連接異常日志信息 162 163 # 返回查詢行數 默認為0 164 rows_count = 0 165 # 用戶表IP解析總數 166 user_ip_num = 0 167 # 訂單表IP解析總數 168 order_ip_num = 0 169 170 171 172 try: 173 174 # 解析用戶表注冊IP信息 175 while user_ip_num <= eval(ip_num_limit): 176 i = 1 # 循環次數 177 ip_write_db("h_user_stat") 178 user_ip_num = user_ip_num + rows_count*i 179 i = i + 1 180 if rows_count == 0 : 181 break 182 183 # 解析訂單表下單IP信息 184 while user_ip_num <= eval(ip_num_limit): 185 # 解析用戶表注冊IP信息 186 i = 1 # 循環次數 187 ip_write_db("h_order") 188 order_ip_num = order_ip_num + rows_count*i 189 i = i + 1 190 if rows_count == 0 : 191 break 192 except Exception as e: 193 logging.error(e) # 輸出異常日志信息 194 finally: 195 # 關閉數據庫連接 196 conn.close() 197 198 # 2 程序結束執行計時 199 endtime = time.time() 200 201 # print('解析用戶IP的總數:' + str(user_ip_num)) 202 # print('解析訂單IP的總數:' + str(order_ip_num)) 203 # # 打印程序執行總耗時 204 # print('解析總耗時秒數:' + str(endtime - starttime)) 205 logging.info('解析用戶IP的總數:' + str(user_ip_num)) 206 logging.info('解析訂單IP的總數:' + str(order_ip_num)) 207 logging.info('解析總耗時秒數:' + str(endtime - starttime))
環境配置db_udw.envdb_udw.env 如下:
# 數據庫環境信息 [postgresql] database = ahaschool_udw user = admin password = 123456 host = 127.0.0.0 port = 5432 # 設置日志文件路徑 [log] log_path = /home/hjmrunning/bi_etl_product/scripts/log # 每頁查詢數據條數 [page] pagesize = 1000 # IP解析條數限制 [ip_num] ip_num_limit = 150000
最后
我接觸Python時間也不是很久,實現方法可能會有疏漏。如果有什么疑問和見解,歡迎評論區交流。