簡單實現基於內存的緩存引擎,並封裝第三方庫aredis實現redis緩存
0.代碼DEMO
"""
緩存組件
"""
import time
import logging
import asyncio
from threading import Lock
from typing import Union, Any, Dict
from aredis import StrictRedis, StrictRedisCluster
class SingletonMeta(type):
"""元類——有限的單例模式
當初始化參數包含new=True時,將構造一個新的對象
"""
__instance = None
__lock = Lock()
def __call__(cls, *args, **kwargs):
with cls.__lock:
new = kwargs.pop('new', None)
if new is True:
return super().__call__(*args, **kwargs)
if not cls.__instance:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
class MemoryEngine:
"""本地內存作為后端緩存引擎,不支持分布式
只支持get、set方法
"""
def __init__(self):
self.namespace = {}
self._check_time = 0
self._check_interval = 60
def delete(self, key: str) -> None:
"""刪除指定緩存"""
if key in self.namespace:
del self.namespace[key]
def et_clear(self) -> None:
"""清理超時緩存"""
clear_names = []
if time.time() > self._check_time + self._check_interval:
self._check_time = time.time()
for name, block in self.namespace.items():
if block.ttl < -1:
clear_names.append(name)
for name in clear_names:
del self.namespace[name]
async def ttl(self, name) -> int:
self.et_clear()
if name not in self.namespace:
return -1
return int(self.namespace[name].ttl)
async def get(self, name):
self.et_clear()
if name not in self.namespace:
return None
return self.namespace[name].val
async def set(self, name, value, ex=None, px=None, nx=False, xx=False):
if nx and name in self.namespace:
return
if xx and name not in self.namespace:
return
self.namespace[name] = DataBlock(name, value, ex, px)
if len(value) > 16384 and (ex or px):
# 實驗性功能 大容量緩存清理機制 避免長時間不使用緩存下占用內存
life = ex if ex else px // 1000
loop = asyncio.get_event_loop()
loop.call_later(life * 2, self.delete, name)
class DataBlock:
"""內存數據塊 封裝了有效期"""
def __init__(self, name: str, value: Any, ex: float = None,
px: float = None):
"""
:param name: key名
:param value: 存儲value
:param ex: 生命周期,單位秒
:param px: 生命周期,單位毫秒
"""
self._name = name
self._value = value
self.et = time.time() - 1
if ex:
self.et += ex
if px:
self.et += (px / 1000)
if not ex and not px:
self._ttl = -1
@property
def val(self):
return self._value if self.ttl >= -1 else None
@property
def ttl(self):
if hasattr(self, '_ttl'):
return self._ttl
return self.et - time.time()
def __repr__(self):
return f'<name={self._name}>'
class Cache(metaclass=SingletonMeta):
"""一個基於redis封裝的異步緩存類,它可以快速方便切換多個緩存庫
Cache類默認使用default緩存庫,你可以使用select(db_name)切換其他庫,並且select支持
鏈式調用,但select方法並不會改變原對象指向的default緩存庫
Cache對象通過反射擁有了StrictRedis和StrictRedisCluster類下的所有方法,你可以直接對
對象執行redis命令,此外Cache還封裝了一個方法execute(command, *args, **kwargs)
相比於反射方法,使用execute方法會自動對返回數據解碼
針對字符串類型,Cache對get和set方法作了優化,當使用get和set方法時,可以同時傳遞一個序列化器,
它會查詢和存儲時自動使用序列化器,也就是說你可以使用set方法存儲任意序列化器支持的對象
"""
logger = logging.getLogger(__name__)
def __init__(self, config: dict):
"""
:param config: 緩存數據庫字典
:return: Cache對象
"""
self._default = 'default'
self._caches = {}
serializer = config.pop('serializer', 'ujson')
try:
self.serializer = __import__(serializer)
except:
self.serializer = __import__('json')
for key, value in config.items():
try:
if value.get('engine') == 'memory':
self._caches[key] = MemoryEngine()
elif 'startup_nodes' in value:
self._caches[key] = StrictRedisCluster(**value)
else:
self._caches[key] = StrictRedis(**value)
except Exception as e:
self.logger.error(e)
@property
def all(self) -> Dict[str, Union[StrictRedis, StrictRedisCluster]]:
"""返回全部緩存數據庫"""
return self._caches
@property
def current_db(self) -> Union[StrictRedis, StrictRedisCluster]:
"""返回緩存對象指向的緩存數據庫"""
return self._caches[self._default]
def select(self, name: str = 'default') -> 'Cache':
"""獲取指定緩存數據庫
支持多次鏈式調用select方法
永遠不會改變app所綁定的默認緩存數據庫
:param name: 定義的數據庫名,默認值為"default"
:return: Cache對象
"""
if name not in self._caches:
raise AttributeError(f'Cache database "{name}" not found. '
f'Please check CACHES config in settings')
obj = Cache(config={}, new=True)
obj._caches = self._caches
obj._default = name
return obj
async def execute(self, command: str, *args, **kwargs) -> Any:
"""實現結果自解碼
:param command: 執行的redis原生命令
:return: 返回redis結果的utf8解碼
"""
if hasattr(StrictRedis, command):
result = await getattr(self, command)(*args, **kwargs)
if result:
result = result.decode('utf8')
return result
else:
raise getattr(self, command)
async def get(self, name, serializer=None, **kwargs) -> Any:
"""覆蓋redis的字符串get方法,提供序列化能力
:param name: key
:param serializer: 使用指定的序列化模塊
:param kwargs: 傳遞給序列化方法
:return: 返回redis結果的反序列化對象
"""
if not serializer:
serializer = self.serializer
value = await self.current_db.get(name)
if not value:
return None
else:
if isinstance(value, bytes):
value = value.decode('utf8')
try:
return serializer.loads(value, **kwargs)
except ValueError:
return value
async def set(self, name: str, value: Any, serializer=None,
ex=None, px=None, nx=False, xx=False, **kwargs) -> bool:
"""永遠在redis層以string格式存儲,提供反序列化能力
:param name:
:param value:
:param serializer: 使用指定的序列化模塊
:param ex: 設置鍵key的過期時間,單位為秒
:param px: 設置鍵key的過期時間,單位為毫秒
:param nx: 只有鍵key不存在的時候才會設置key的值
:param xx: 只有鍵key存在的時候才會設置key的值
:param kwargs: 傳遞給反序列化方法
:return: 執行結果
"""
if not serializer:
serializer = self.serializer
_kwargs = {'ensure_ascii': True}
_kwargs.update(kwargs)
if not isinstance(value, str):
value = serializer.dumps(value, **_kwargs)
return await self.current_db.set(name, value, ex, px, nx, xx)
def handle(self, backed: str):
# TODO 查詢緩存庫
# TODO 執行handler獲取結果
# TODO 返回結果並存儲至緩存
"""提供給視圖方法的裝飾器 它緩存視圖方法返回的結果"""
def __getitem__(self, item) -> 'Cache':
return self.select(item)
def __getattr__(self, attr) -> Any:
return getattr(self.current_db, attr)
1.初始化並綁定web應用
# 緩存配置 默認庫key值為default不可更改
# serializer為使用的默認序列化模塊,不能使用serializer作為緩存數據庫的key值
# 在無顯示指定的情況下,會優先選擇ujson作為序列化模塊
# 當指定engine=memory時,使用本地內存作為緩存,本地內存緩存只支持get、set方法存取值
config = {
'serializer': 'ujson',
'default': {'engine': 'memory'},
'redis': {'host': 'localhost', 'port': 6379, 'db': 4},
}
app = Sanic()
# 實例化Cache對象並綁定
app.cache = Cache(config)
2.視圖層獲取緩存對象
# 方式一,通過app獲取(這里想吐槽一下,sanic似乎沒有提供一個方法來獲取全局app對象?)
async def hander(request):
cache = request.app.cache
# 方式二,由於Cache使用了單例模式,可以通過import導入Cache並實例化
from cache import Cache
cache = Cache()
3.使用緩存
# 1.存取數據
# get和set的存儲過程序列化數據(默認優先ujson)后存入對應緩存組件,取出過程則是逆過程(如果使用redis,會自動處理utf8解碼)
# get和set也支持傳入一個serializer參數,用於自定義序列化和反序列化器,詳見源碼
cache = request.app.cache
data = {'name': '上海'}
# 將data存入緩存,命名為mapData,有效期30秒
await cache.set("mapData", data, ex=30)
# 獲取緩存中key=mapData的值
val = await cache.get("mapData")
# 2.多緩存庫
# 獲取all屬性所有緩存庫
print(cache.all)
# 使用select方法指定緩存庫
await cache.select('redis').get('mapData')
# select支持鏈式調用
await cache.select('redis').select('default').set('mapData', data)
# 獲取選擇緩存庫的存儲引擎
engine: StrictRedis = cache.select('redis').current_db
# 執行存儲引擎所支持的命令
hash = await engine.hget('hashmap')
# 3.由於Cache通過反射機制,可以直接調用后端存儲引擎支持的方法,如redis作為后端引擎時
await cache.select('redis').sismember('name', 'value')
# 4.除了redis,簡單實現了一個無序依賴第三方組件的內存緩存引擎,其實就是通過一個字典來存儲數據,參考DataBlock源碼
# 這個緩存引擎實現了get、set、ttl三個公共接口,詳見MemoryEngine源碼