前言
背景簡介
最近做了一些對接某書的廣告業務,主要還是根據自己業務的需求調用它的SDK從人家的服務器中獲取源數據然后再做一下自己這邊的業務邏輯的處理。
由於源數據不在我們本地,需要調用SDK從遠端的服務器去獲取數據,所以對於那些需要頻繁調用接口獲取的並且更新不是十分頻繁數據我們可以考慮將它們在第一次獲取到后緩存在本地,下一次再來獲取這些數據的時候可以直接從緩存中獲取,只要做好了緩存的一致性等措施,就可以避免由於網絡延遲等原因導致的調用SDK超時而獲取不到數據等等諸如此類的問題。
緩存數據庫我這邊選取了Redis,現在的業務中主要用到了Redis的string、list、與hash這三種數據類型,下面就為大家介紹一下筆者在本項目中使用Redis的一些思路。
注意事項
由於公司的業務代碼不能外泄,所以本文只提供解決問題的思路,然后講一下會遇到哪些坑以及解決這些坑的思路,給出的代碼也都是偽代碼,希望能為新手程序員提供一些解決實際問題的思路,當然歡迎老鳥們在下面評論區瘋狂吐槽,指出問題對於筆者的提升來說幫助也很大。
SDK參考的資料
關於某書的SDK可以從這里找到其源碼及文檔說明:https://github.com/facebook/facebook-python-business-sdk
文檔說明在這里(吐槽一下這文檔做的真是***):https://developers.facebook.com/docs/marketing-apis
Redis初始化操作
python版本使用的是3.6.5,redis使用第三方的redis模塊,需要手動安裝一下:
pip3 install redis
然后使用的是redis連接池,每一個redis的client每次都從初始化好的連接池中獲取連接。
這里建議大家將redis連接池這個變量放在統一的配置文件中,這樣在其他模塊中都從同一個配置文件中import這個變量時連接池就成了一個單例(基於模塊導入的單例),實現了一個單例模式。
像下面這樣:
config.py中:
import redis REDISES = { 'cache': { 'host': '127.0.0.1', 'port': 6379, 'max_connections': 300, 'db': 2, # 選擇第2個db作為緩存的庫 'password': '', }, } # redis連接池 FB_POOL = redis.ConnectionPool(**REDISES["cache"])
業務代碼中client使用單例連接池:
import redis from config import FB_POOL redis_client = redis.Redis(connection_pool=FB_POOL)
Redis的使用之:string類型
string類型用起來十分簡單,直接按照 key:value 的形式存儲就可以了。
這里需要注意2點:1是如果存入redis的value是一個字典格式的格式化的數據,需要將這個字典序列化成json數據后才能存入redis;2是python中redis的set方法可以在存儲key的時候設置key的過期時間。
我這里使用string類型存儲了國家及語言信息,因為考慮到這樣的數據改動不會很頻繁,因此緩存同步的思路是設置redis中對應key的過期時間為7天,這樣每7天請求一次SDK獲取最新的數據更新緩存就可以了。
具體的偽代碼如下:
def get_country_msg(account_id: str, result: dict) -> dict: # 如果redis中有的話從redis中獲取 否則從接口中獲取然后將數據寫入redis中並設置生存時間下次從redis中獲取 if redis_client.exists("fbadset:country_code_dic") and redis_client.exists("fbadset:country_group_area"): # print("redis中有language的數據!!!") ret_str = redis_client.get("fbadset:country_group_area") ret = json.loads(ret_str) result = {"data": ret, "code": 0, "message": "成功返回country信息"} return result # 先獲取國家名稱與編號的對應關系 ### 業務代碼省略。。。 country_lst, err = operate_obj.get_country_msg() # 然后構建返回的數據 country_code_dic = get_country_code_dic(country_lst) # 獲取 country_group 的信息構建數據 country_group_lst, err = operate_obj.get_country_msg(location_types="country_group") ret = defaultdict(dict) ### 業務代碼省略。。。 ret = 構建數據 # 在redis中存一份 7天后失效 ret_str = json.dumps(ret) country_code_str = json.dumps(country_code_dic) # set值的同時設置超時時間 7天 redis_client.set("fbadset:country_code_dic", country_code_str, nx=True, ex=7 * 24 * 3600) redis_client.set("fbadset:country_group_area", ret_str, nx=True, ex=7 * 24 * 3600) result = {"data": ret, "code": 0, "message": "成功返回country信息"} return result
Redis的使用之:list類型
主要使用list構建了一個隊列,並且使用其切片的特性進行分頁。
實際業務中,我們需要從fb的后台獲取大量的廣告數據,而這些廣告數據如果每次都通過調用SDK的方式獲取的話效率十分低下,尤其是頁碼比較大的情況,實際測試發現打開一個頁碼比較大的數據時頁面加載的時間竟然有半分多鍾!這樣用戶的體驗會非常差!
我這邊優化的思路是:在請求第一頁的數據時調用SDK接口獲取數據,與此同時將獲取到的所有數據構建成自己提前定義好的格式rpush到redis的隊列中,然后再請求其他頁面的時候從redis中使用lrange的方式根據前端傳來的頁碼以及每頁需要展示的數據切片獲取數據並處理完畢后返回給前端展示。
特別注意:這里之所以要在請求第一頁數據時調用SDK接口獲取數據是為了保證緩存的一致性!如果用戶在fb的后台創建了一條廣告但是redis中沒有及時更新的話展示的時候是一定會將新創建的廣告漏掉的!
另外需要注意的是:redis的lrange方法切片的區間是:閉區間!!!
涉及到的偽代碼如下:
# ################################## 賬戶下的所有ad ####################### page=1 size=3 fields = ["id","name","status","campaign_id","account_id"] ## 這里設置limit其實無實際意義了~因為后面都是從redis中獲取的 # ret = ad_account.get_ad_sets(params={"limit":size})
# 業務代碼 ret = ad_account.get_ads(params={"limit":10000},fields=fields) # print("ret<<<<",ret,type(ret),len(ret)) print("len<<<<<",len(ret)) # # 所有的數據 total_count = ret.total() # 只取1w條數據 if total_count > 10000: total_count = 10000 print("total_count>>>",total_count) #####存入redis的list中 # ret_lst = list() for i in range(total_count): dic = ret[i] print("dic>>>",dic) # 構建字典並序列化 存入list中 id_ = dic["id"] name = dic["name"] status = dic["status"] curr_dic = {"id":id_,"name":name,"status":status} insert_str = json.dumps(curr_dic) # print("insert_str>>>>",insert_str) # ret_lst.append(insert_str) # 將數據存在redis的list里面 # rpush
# redis_key 注意后面要加上賬戶標識:表明是哪個賬戶下的ad
r_key = "fbad:account_ads:{}".format(account_id) redis_client.rpush(r_key,insert_str) ## llen redis_count = redis_client.llen(r_key) print("redis_count>>>>",redis_count,type(redis_count)) ## lrange ———— 閉區間!!! redis_ret = redis_client.lrange(r_key,0,2) print("redis_ret>>>",redis_ret) for bit in redis_ret: # print(bit,type(bit)) res = json.loads(bit) print(res,type(res)) ## delete
# del_ret = redis_client.delete(r_key)
# print("del_ret.>>",del_ret)
Redis的使用之:hash類型
fb的廣告的層級結構大體來講有3層:Campaign、Adset與ad。這三個層級依次是包含的關系,也就是說,Campaign包含多個Adset,Adset包含多個ad。
而在Adset層級又包含許多受眾audience。
實際業務中我們需要頻繁從fb的后台拉取某一個賬戶下的受眾信息,頻繁通過SDK請求數據效率十分低下,並且這些受眾信息在fb后台創建后基本不會變了。
所以我這里還是選擇將賬戶下的受眾信息緩存到本地,頻繁的從redis中獲取數據效率要遠高於通過SDK獲取。
之所以選擇hash存儲,是因為賬戶下的受眾信息的結構可以抽象成下面這種數據結構:
"account_audiences" = { "audience_id1":{"id":"id1","name":"xxxx1","msg":{...}}, "audience_id2":{"id":"id2","name":"xxxx2","msg":{...}}, "audience_id3":{"id":"id3","name":"xxxx3","msg":{...}}, }
針對這種數據結構,我們只要定好hash外層的key,對於hash內部的key直接使用每一個audience的id就可以了!
這里專門寫了一個demo為大家簡單描述下業務實現的整體流程:
# -*- coding:utf-8 -*- import json import redis import requests from config import POOL # 導入連接池 # redis客戶端 redis_client = redis.Redis(connection_pool=POOL,max_connections=1000) # 實際UA的token —— facebook_bm_ls_access_token # access_token = '' access_token = 'xxxxxx' # 初始化與實例化賬戶對象 # a = FacebookAdsApi.init(access_token=access_token) # ad_account = AdAccount('act_906284123078550') search_url = "https://graph.facebook.com/v8.0/act_{}".format("83xxxxx") # account_id params = { "access_token": access_token, # 只獲取保存的受眾的信息 —— 保存的受眾中有targeting的數據 "fields": "saved_audiences.limit(300){id,name,targeting}", } ### 從接口中獲取 audiences_lst err = "" resp = requests.get(search_url, params=params) print("resp>>>>>",resp) print("url>>>>",resp.url) if resp.status_code == 200: audiences_ret = resp.json() audiences_lst = resp.json().get("saved_audiences",{}).get("data",[]) print("audiences_lst>>>>",audiences_lst) else: print("err>>>>",resp.json()) audiences_lst = [] ### redis哈希的操作 ———— 外層hash的key audiences_key = "fb_adset:account_saved_audiences:{}".format("8333xxxxx") # 1、 往哈希里面寫數據 if redis_client.exists(audiences_key): audiences_id_lst = redis_client.hkeys(audiences_key) print("audiences_id_lst>>>>>>",audiences_id_lst) # 假設用戶不會修改原來的值,所以:對於之前存在的數據就不往redis里面寫了 for dic in audiences_lst: id = dic["id"] # 如果接口獲取的id不在redis里面就將這個結果寫入對應的redis的字典中 # 注意redis里面的id是bytes類型的!需要轉化一下再比較! id_bytes = id.encode("utf-8") if id_bytes not in audiences_id_lst: print("NONONONONONONONONONON") value = json.dumps(dic) # hash外層的key用前面定義好的(與賬戶關聯),里層的key使用每個受眾的id redis_client.hset(audiences_key,id,value) else: print("YESYESYESYES") continue else: print("redis中沒有對應的audiences_key:{}".format(audiences_key)) # 將數據寫入redis for dic in audiences_lst: id = dic["id"] value = json.dumps(dic) # key是id, redis_client.hset(audiences_key,id,value) # 單獨為某一個key設置過期時間 redis_client.expire(audiences_key,24 * 3600) # 2、從哈希里邊獲取指定key(audienct_id)的數據 audience_id = "23845xxxxx" if redis_client.exists(audiences_key): # 因為redis里面的key是bytes類型的,需要將外面待比較的key編碼后再與hkeys得到的結果進行比較!!! au_id_bytes = audience_id.encode("utf-8") # 注意hkeys拿到的list里面的key是bytes類型的! audience_id_lst = redis_client.hkeys(audiences_key) if au_id_bytes in audience_id_lst: audience_dic = redis_client.hget(audiences_key,audience_id) print("ret>>>",audience_dic) print("type>>>>",type(audience_dic)) audience_ret = json.loads(audience_dic) print("rr>>>>>",audience_ret)
可以給大家展示下實際redis庫中的hash數據:
hkeys結果:
獲取某一個受眾id的數據:
后記
本文算是個人在實際中解決問題的一些筆記,希望能給新手程序員一些解決問題的思路,后續有更好的方案也會加進文章中去。