這幾天離職在家,正好沒事可以瘋狂的輸出一下,本來想寫DUBBO的源碼解析的,但是發現寫DUBBO源碼的太多了,所以找一個寫的不那么多的框架,所以就選中SOFARPC這個框架了。
SOFARPC是螞蟻金服開源的一個RPC框架,相比DUBBO它沒有這么多歷史的包袱,代碼更加簡潔,設計思路更加清晰,更加容易去理解其中的代碼。
那么為什么要去重寫原生的SPI呢?官方給出了如下解釋:
- 按需加載
- 可以有別名
- 可以有優先級進行排序和覆蓋
- 可以控制是否單例
- 可以在某些場景下使用編碼
- 可以指定擴展配置位置
- 可以排斥其他擴展點
整個流程如下:
我們以ConsumerBootstrap為例:
先要有一個抽象類:
@Extensible(singleton = false)
public abstract class ConsumerBootstrap<T> {
....
}
指定擴展實現類:
@Extension("sofa")
public class DefaultConsumerBootstrap<T> extends ConsumerBootstrap<T> {
...
}
擴展描述文件META-INF/services/sofa-rpc/com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap
sofa=com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap
當這些准備完成后,直接調用即可。
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
接下來我們看看ExtensionLoaderFactory的源碼
/**
* All extension loader {Class : ExtensionLoader}
* 這個map里面裝的是所有ExtensionLoader
*/
private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
if (loader == null) {
//get不到則加上鎖
synchronized (ExtensionLoaderFactory.class) {
//防止其他線程操作再get一次
loader = LOADER_MAP.get(clazz);
if (loader == null) {
loader = new ExtensionLoader<T>(clazz, listener);
LOADER_MAP.put(clazz, loader);
}
}
}
return loader;
}
然后我們看一下ExtensionLoader這個類的構造器
protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
//如果正在執行關閉,則將屬性置空后直接返回
if (RpcRunningState.isShuttingDown()) {
this.interfaceClass = null;
this.interfaceName = null;
this.listener = null;
this.factory = null;
this.extensible = null;
this.all = null;
return;
}
// 接口為空,既不是接口,也不是抽象類
if (interfaceClass == null ||
!(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
}
//當前加載的接口類名
this.interfaceClass = interfaceClass;
//接口名字
this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
this.listener = listener;
//接口上必須要有Extensible注解
Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
if (extensible == null) {
throw new IllegalArgumentException(
"Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
} else {
this.extensible = extensible;
}
// 如果是單例,那么factory不為空
this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
//這個屬性里面是這個接口的所有實現類
this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
if (autoLoad) {
//獲取到擴展點加載的路徑
List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
for (String path : paths) {
//根據路徑加載文件
loadFromFile(path);
}
}
}
拿到所有的擴展點加載的路徑后進入到loadFromFile中進行文件的加載
protected synchronized void loadFromFile(String path) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Loading extension of extensible {} from path: {}", interfaceName, path);
}
// 默認如果不指定文件名字,就是接口名
String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
String fullFileName = path + file;
try {
ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
loadFromClassLoader(classLoader, fullFileName);
} catch (Throwable t) {
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Failed to load extension of extensible " + interfaceName + " from path:" + fullFileName,
t);
}
}
}
protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {
Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName)
: ClassLoader.getSystemResources(fullFileName);
// 可能存在多個文件。
if (urls != null) {
while (urls.hasMoreElements()) {
// 讀取一個文件
URL url = urls.nextElement();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Loading extension of extensible {} from classloader: {} and file: {}",
interfaceName, classLoader, url);
}
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
String line;
while ((line = reader.readLine()) != null) {
readLine(url, line);
}
} catch (Throwable t) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Failed to load extension of extensible " + interfaceName
+ " from classloader: " + classLoader + " and file:" + url, t);
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
}
接下來進入到readLine,這個方法主要是讀取prop文件里面的每一行記錄,並加載該實現類的類文件校驗完后將文件添加到all屬性中
protected void readLine(URL url, String line) {
//讀取文件里面的一行記錄,並將這行記錄用=號分割
String[] aliasAndClassName = parseAliasAndClassName(line);
if (aliasAndClassName == null || aliasAndClassName.length != 2) {
return;
}
//別名
String alias = aliasAndClassName[0];
//包名
String className = aliasAndClassName[1];
// 讀取配置的實現類
Class tmp;
try {
tmp = ClassUtils.forName(className, false);
} catch (Throwable e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Extension {} of extensible {} is disabled, cause by: {}",
className, interfaceName, ExceptionUtils.toShortString(e, 2));
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Extension " + className + " of extensible " + interfaceName + " is disabled.", e);
}
return;
}
if (!interfaceClass.isAssignableFrom(tmp)) {
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", " + className + " is not subtype of interface.");
}
Class<? extends T> implClass = (Class<? extends T>) tmp;
// 檢查是否有可擴展標識
Extension extension = implClass.getAnnotation(Extension.class);
if (extension == null) {
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", " + className + " must add annotation @Extension.");
} else {
String aliasInCode = extension.value();
if (StringUtils.isBlank(aliasInCode)) {
// 擴展實現類未配置@Extension 標簽
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceClass +
" from file:" + url + ", " + className + "'s alias of @Extension is blank");
}
if (alias == null) {
// spi文件里沒配置,用代碼里的
alias = aliasInCode;
} else {
// spi文件里配置的和代碼里的不一致
if (!aliasInCode.equals(alias)) {
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", aliases of " + className + " are " +
"not equal between " + aliasInCode + "(code) and " + alias + "(file).");
}
}
// 接口需要編號,實現類沒設置
if (extensible.coded() && extension.code() < 0) {
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", code of @Extension must >=0 at " + className + ".");
}
}
// 不可以是default和*
if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {
throw new IllegalArgumentException("Error when load extension of extensible " + interfaceName +
" from file:" + url + ", alias of @Extension must not \"default\" and \"*\" at " + className + ".");
}
// 檢查是否有存在同名的
ExtensionClass old = all.get(alias);
ExtensionClass<T> extensionClass = null;
if (old != null) {
// 如果當前擴展可以覆蓋其它同名擴展
if (extension.override()) {
// 如果優先級還沒有舊的高,則忽略
if (extension.order() < old.getOrder()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Extension of extensible {} with alias {} override from {} to {} failure, " +
"cause by: order of old extension is higher",
interfaceName, alias, old.getClazz(), implClass);
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Extension of extensible {} with alias {}: {} has been override to {}",
interfaceName, alias, old.getClazz(), implClass);
}
// 如果當前擴展可以覆蓋其它同名擴展
extensionClass = buildClass(extension, implClass, alias);
}
}
// 如果舊擴展是可覆蓋的
else {
if (old.isOverride() && old.getOrder() >= extension.order()) {
// 如果已加載覆蓋擴展,再加載到原始擴展
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Extension of extensible {} with alias {}: {} has been loaded, ignore origin {}",
interfaceName, alias, old.getClazz(), implClass);
}
} else {
// 如果不能被覆蓋,拋出已存在異常
throw new IllegalStateException(
"Error when load extension of extensible " + interfaceClass + " from file:" + url +
", Duplicate class with same alias: " + alias + ", " + old.getClazz() + " and " + implClass);
}
}
} else {
extensionClass = buildClass(extension, implClass, alias);
}
if (extensionClass != null) {
// 檢查是否有互斥的擴展點
for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {
ExtensionClass existed = entry.getValue();
if (extensionClass.getOrder() >= existed.getOrder()) {
// 新的優先級 >= 老的優先級,檢查新的擴展是否排除老的擴展
String[] rejection = extensionClass.getRejection();
if (CommonUtils.isNotEmpty(rejection)) {
for (String rej : rejection) {
existed = all.get(rej);
if (existed == null || extensionClass.getOrder() < existed.getOrder()) {
continue;
}
ExtensionClass removed = all.remove(rej);
if (removed != null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Extension of extensible {} with alias {}: {} has been reject by new {}",
interfaceName, removed.getAlias(), removed.getClazz(), implClass);
}
}
}
}
} else {
String[] rejection = existed.getRejection();
if (CommonUtils.isNotEmpty(rejection)) {
for (String rej : rejection) {
if (rej.equals(extensionClass.getAlias())) {
// 被其它擴展排掉
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Extension of extensible {} with alias {}: {} has been reject by old {}",
interfaceName, alias, implClass, existed.getClazz());
return;
}
}
}
}
}
}
loadSuccess(alias, extensionClass);
}
}
加載完文件后我們再回到
ConsumerBootstrap sofa = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class).getExtension("sofa");
進入到getExtension方法中
public ExtensionClass<T> getExtensionClass(String alias) {
return all == null ? null : all.get(alias);
}
public T getExtension(String alias) {
//從all屬性中拿到加載的class
ExtensionClass<T> extensionClass = getExtensionClass(alias);
if (extensionClass == null) {
throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \"" + alias + "\"!");
} else {
//在加載class的時候,校驗了是否是單例,如果是單例,那么factory不為null
if (extensible.singleton() && factory != null) {
T t = factory.get(alias);
if (t == null) {
synchronized (this) {
t = factory.get(alias);
if (t == null) {
//實例化
t = extensionClass.getExtInstance();
//放入到factory,單例的class下次直接拿就好了,不需要重新創建
factory.put(alias, t);
}
}
}
return t;
} else {
//實例化
return extensionClass.getExtInstance();
}
}
}
我們進入到ExtensionClass看看getExtInstance方法
/**
* 服務端實例對象(只在是單例的時候保留)
* 用volatile修飾,保證了可見性
*/
private volatile transient T instance;
/**
* 得到服務端實例對象,如果是單例則返回單例對象,如果不是則返回新創建的實例對象
*
* @param argTypes 構造函數參數類型
* @param args 構造函數參數值
* @return 擴展點對象實例 ext instance
*/
public T getExtInstance(Class[] argTypes, Object[] args) {
if (clazz != null) {
try {
if (singleton) { // 如果是單例
if (instance == null) {
synchronized (this) {
if (instance == null) {
//通過反射創建實例
instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
}
}
}
return instance; // 保留單例
} else {
//通過反射創建實例
return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
}
} catch (Exception e) {
throw new SofaRpcRuntimeException("create " + clazz.getCanonicalName() + " instance error", e);
}
}
throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
}
看完了SOFARPC的擴展類實現后感覺代碼寫的非常的整潔,邏輯非常的清晰,里面有很多可以學習的地方,比如線程安全用到了雙重檢查鎖和volatile保證可見性。