上下文
記的學英語的時候,總是不記的某個詞是什么意思,然后就看不下去了,只能問周圍的同學或者老師或者去查詞典,他們的建議是通過上下文去推測這個詞大概的意思,反正我那會上學時沒理解,所以英文一直比較差。
現在英語水平也沒提高多少,盡管有點領會。
后來慢慢理解了一些,因為有些詞有很多種意思,放在某個場景下可能是一個意思,放在另外一個場景下又是其它的意思,這里不舉例子了,上文有一定的相似度。
RPC客戶端上下文
客戶端由於是一個獨立的環境,所以可以認為它處於一個屬於自己的上下文中,與其它的端隔離。在上下文中可以指定一些公共的參數來提供給接口使用,比如:
RPC版本號
客戶端請求ID
各類自定義的公共參數
RPC服務端上下文
服務端同理,也處於一個屬於自己的上下文中,與其它端隔離。
RPC上下文的作用
基本線程級別的訪問,讓客戶端或者服務端能夠像訪問本地變量一樣訪問RPC框架級別的變量。比如我們想將客戶端的一個請求ID傳遞給服務端,這個請求ID作用於所有接口,比如RPC的調用鏈追蹤,有兩種方式:
接口中增加請求ID參數
這個方案顯然是不能接受的,因為需要改的接口過多。
接口不改的情況下,在RPC框架中提供一個上下文,其中包含請求ID
這個方案顯然成本最小,比如這種樣調用:RpcContext.getRequestId();
- 上圖中的Context是指RPC框架級變量的一個本地副本,為了簡化調用復雜度。
- 上圖中的Invocation是RPC框架級的變量,它與上面提到的Context相互配合,做到調用端與框架本身的解耦合
實現
定義上下文對象
在RpcContext對象中增加一個map類型的參數對象,可以存放任意擴展的參數。
public class RpcContext {
private Map<String,Object> contextParameters;
public Map<String, Object> getContextParameters() {
return contextParameters;
}
public void setContextParameters(Map<String, Object> contextParameters) {
this.contextParameters = contextParameters;
}
private static final ThreadLocal<RpcContext> rpcContextThreadLocal=new ThreadLocal<RpcContext>(){
@Override
protected RpcContext initialValue() {
RpcContext context= new RpcContext();
context.setContextParameters(Maps.newHashMap());
return context;
}
};
public static RpcContext getContext() {
return rpcContextThreadLocal.get();
}
public static void removeContext() {
rpcContextThreadLocal.remove();
}
private RpcContext() {
}
}
RPC請求對象中增加上下文參數
RpcRequest增加如下字段,用於服務端調用。
private Map<String,Object> contextParameters;
RpcInvocation接口中增加上下文參數
在后續新增加的過濾器使用。
private Map<String,Object> contextParameters;
客戶端代理
RpcProxy在組裝RpcRequest對象時,從RpcContext中獲取最新的參數傳遞給RpcReuest,從而傳遞給服務端。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
//...
request.setContextParameters(RpcContext.getContext().getContextParameters());
//...
}
客戶端上下文過濾器
主要作用就是從本線線程變量中獲取參數傳遞給RpcInvocation。
注解上的order屬性文章后面詳細介紹
@ActiveFilter(group = {ConstantConfig.CONSUMER},order = -1000)
public class ClientContextFilter implements RpcFilter {
private final static Logger logger = LoggerFactory.getLogger(ClientContextFilter.class);
@Override
public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
Map<String,Object> contextParameters=invocation.getContextParameters();
if(null==contextParameters){
contextParameters= Maps.newHashMap();
}
Map<String,Object> contextParametersFromRpcContext= RpcContext.getContext().getContextParameters();
if(null!=contextParametersFromRpcContext) {
contextParameters.putAll(contextParametersFromRpcContext);
}
Object rpcResponse=invoker.invoke(invocation);
logger.info("ClientContextFilter.invoke end");
return rpcResponse;
}
}
服務端上下文過濾器
服務端上下文過濾器與客戶端的作用相反,是從RpcInvocation中獲取參數傳遞給本地線程變量RpcContext,后面在執行服務端方法時就可以方便的通過RpcContext獲取指定變量。
@ActiveFilter(group = {ConstantConfig.PROVIDER},order = -1000)
public class ServerContextFilter implements RpcFilter {
private final static Logger logger = LoggerFactory.getLogger(ServerContextFilter.class);
@Override
public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
Map<String,Object> contextParameters=invocation.getContextParameters();
RpcContext.getContext().setContextParameters(contextParameters);
Object rpcResponse=invoker.invoke(invocation);
logger.info("ServerContextFilter.invoke end");
return rpcResponse;
}
}
過濾器排序
因為我們的RpcContext是個本地線程變量,而且Rpc服務端是多線程處理業務,所以需要在請求結束后及時的清理掉相關本地線程變量信息。這就需要清理上下文的過濾動作在最后執行,否則有會出現服務端方法還沒有執行就被清空了參數。創建一個工具類,專門用來處理獲取客戶端以及服務端過濾器。
過濾器注解增加排序屬性
增加order字段,升級排列。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ActiveFilter {
String[] group() default {};
String[] value() default {};
int order() default 999999999;
}
過濾器工具類
創建ActiveFilterUtil,包含下面兩個函數。
過濾器排序函數
獲取特定注解的類,然后根據注解上的排序屬性升序排序。
private static List<Object> getActiveFilter(){
List<Object> rpcFilterList= Lists.newArrayList();
Map<String, Object> rpcFilterMapObject = ApplicationContextUtils.getApplicationContext().getBeansWithAnnotation(ActiveFilter.class);
if (null!=rpcFilterMapObject) {
rpcFilterList = Lists.newArrayList(rpcFilterMapObject.values());
Collections.sort(rpcFilterList, new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
ActiveFilter activeFilterO1 = o1.getClass().getAnnotation(ActiveFilter.class);
ActiveFilter activeFilterO2 = o2.getClass().getAnnotation(ActiveFilter.class);
return activeFilterO1.order() > activeFilterO2.order() ? 1 : -1;
}
});
}
return rpcFilterList;
}
獲取RPC過濾器列表
提供給客戶端以及服務端的一個協助方法,便於客戶端以及服務構建過濾器職責鏈。
public static Map<String,RpcFilter> getFilterMap(boolean isServer){
List<Object> rpcFilterList=getActiveFilter();
Map<String,RpcFilter> filterMap=new HashMap<>();
for (Object filterBean : rpcFilterList) {
Class<?>[] interfaces = filterBean.getClass().getInterfaces();
ActiveFilter activeFilter=filterBean.getClass().getAnnotation(ActiveFilter.class);
String includeFilterGroupName=!isServer?ConstantConfig.CONSUMER:ConstantConfig.PROVIDER;
if(null!=activeFilter.group()&& Arrays.stream(activeFilter.group()).filter(p->p.contains(includeFilterGroupName)).count()==0){
continue;
}
for(Class<?> clazz:interfaces) {
if(clazz.isAssignableFrom(RpcFilter.class)){
filterMap.put(filterBean.getClass().getName(),(RpcFilter) filterBean);
}
}
}
return filterMap;
}
客戶端以及服務端初始化
獲取過濾器map的邏輯改為調用上面ActiveFilterUtil.getFilterMap方法。
使用示例
客戶端設置參數
設置一個RPC版本號的參數。
@RequestMapping("/{productId}")
public Product getById(@PathVariable final long productId) throws UnknownHostException {
RpcContext.getContext().addContextParameter("rpc-version","1.0");
//...
}
服務端獲取參數
簡單的在服務端打印出RPC版本號。
logger.info("get context parameter from server,rpc-version={}",String.valueOf(RpcContext.getContext().getContextParameter("rpc-version")));
輸出日志如下:
本文源碼
https://github.com/jiangmin168168/jim-framework
文中代碼是依賴上述項目的,如果有不明白的可下載源碼