服務端的示例
我們首先貼上我們的服務端的示例:
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 設置一個協議,默認bolt
.setPort(12200) // 設置一個端口,默認12200
.setDaemon(false); // 非守護線程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定實現
.setServer(serverConfig); // 指定服務端
providerConfig.export(); // 發布服務
}
ProviderConfig#export
從示例入手我們設置好ServerConfig和ProviderConfig之后調用ProviderConfig的export方法進行暴露
ProviderConfig#export
public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
//DefaultProviderBootstrap
providerBootstrap.export();
}
Bootstraps#from這個方法我們在《源碼分析---SOFARPC客戶端服務引用》中已經分析過,所以這里就不分析了,里面就是調用SOFARPC自己的SPI機制返回的是DefaultProviderBootstrap實例。
DefaultProviderBootstrap#export
然后調用DefaultProviderBootstrap#export方法
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延遲加載,單位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}
這里有兩個分支,如果在設置providerConfi的時候設置了延遲屬性的話,那么就會調用NamedThreadFactory#newThread方法起一個線程,然后延遲調用doExport方法。
DefaultProviderBootstrap#doExport
/**
* 是否已發布
*/
protected transient volatile boolean exported;
/**
* 發布的服務配置
*/
protected final static ConcurrentMap<String, AtomicInteger> EXPORTED_KEYS = new ConcurrentHashMap<String, AtomicInteger>();
private void doExport() {
//校驗一下,如果服務已經暴露過了那么就不再進行暴露
if (exported) {
return;
}
// 檢查參數
checkParameters();
String appName = providerConfig.getAppName();
//key is the protocol of server,for concurrent safe
Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
// 將處理器注冊到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol;
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}
// 注意同一interface,同一uniqueId,不同server情況
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 計數器
if (cnt == null) { // 沒有發布過
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
//計數器加一,如果計數器的值超過了設置的exportLimit,那么就會拋出異常
int c = cnt.incrementAndGet();
hasExportedInCurrent.put(serverConfig.getProtocol(), true);
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
//計數器減一
decrementCounter(hasExportedInCurrent);
// 超過最大數量,直接拋出異常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
+ " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!");
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
}
}
try {
// 構造請求調用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注冊中心
if (providerConfig.isRegister()) {
//如果有設置注冊中心的話就遍歷注冊中心
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
// 提前初始化Registry到ALL_REGISTRIES對象中
RegistryFactory.getRegistry(registryConfig);
}
}
}
//如果有多個配置則逐個遍歷
// 將處理器注冊到server
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注冊請求調用器
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}
providerConfig.setConfigListener(new ProviderAttributeListener());
// 注冊到注冊中心
register();
} catch (Exception e) {
decrementCounter(hasExportedInCurrent);
if (e instanceof SofaRpcRuntimeException) {
throw (SofaRpcRuntimeException) e;
} else {
throw new SofaRpcRuntimeException("Build provider proxy error!", e);
}
}
// 記錄一些緩存數據
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
doExport方法里面主要做了以下幾件事:
- 檢查參數是否正確
- 檢查注入的對象是否是接口的實現類
- providerConfig是否有設置server參數
- 檢查方法,是否有重名的方法,對方法進行黑白名單的過濾(對配置的include和exclude方法進行過濾)
- 遍歷設置的serverConfigs
- 對要發布的接口進行計數,如果超過了設置的repeatedExportLimit那么就拋出異常
- 構造請求調用器
- 初始化注冊中心
- 注冊請求調用器
- 啟動服務
- 設置監聽
- 注冊到注冊中心
接下來我們挨個分析上面的步驟
參數檢查DefaultProviderBootstrap#checkParameters
protected void checkParameters() {
// 檢查注入的ref是否接口實現類
Class proxyClass = providerConfig.getProxyClass();
String key = providerConfig.buildKey();
T ref = providerConfig.getRef();
if (!proxyClass.isInstance(ref)) {
throw ExceptionUtils.buildRuntime("provider.ref",
ref == null ? "null" : ref.getClass().getName(),
"This is not an instance of " + providerConfig.getInterfaceId()
+ " in provider config with key " + key + " !");
}
// server 不能為空
if (CommonUtils.isEmpty(providerConfig.getServer())) {
throw ExceptionUtils.buildRuntime("server", "NULL", "Value of \"server\" is not specified in provider" +
" config with key " + key + " !");
}
checkMethods(proxyClass);
}
這個方法里面主要做了一下幾件事:
- 調用
providerConfig.getProxyClass();
獲取接口class,在我們這個示例中是interface com.alipay.sofa.rpc.quickstart.HelloService - 調用
providerConfig.getRef();
獲取接口實現類引用,我們這里對應的是HelloServiceImpl - 調用
proxyClass.isInstance
判斷ref是否是接口的實現類,如果不是的話拋出異常 - 校驗server不能為空
- 調用checkMethods方法校驗方法
進入到checkMethods方法中
protected void checkMethods(Class<?> itfClass) {
ConcurrentHashMap<String, Boolean> methodsLimit = new ConcurrentHashMap<String, Boolean>();
for (Method method : itfClass.getMethods()) {
String methodName = method.getName();
if (methodsLimit.containsKey(methodName)) {
// 重名的方法
if (LOGGER.isWarnEnabled(providerConfig.getAppName())) {
LOGGER.warnWithApp(providerConfig.getAppName(), "Method with same name \"" + itfClass.getName()
+ "." + methodName + "\" exists ! The usage of overloading method in rpc is deprecated.");
}
}
// 判斷服務下方法的黑白名單
Boolean include = methodsLimit.get(methodName);
if (include == null) {
//對配置的include和exclude方法進行過濾
// 檢查是否在黑白名單中
include = inList(providerConfig.getInclude(), providerConfig.getExclude(), methodName);
methodsLimit.putIfAbsent(methodName, include);
}
}
providerConfig.setMethodsLimit(methodsLimit);
}
在這個方法里首先會去遍歷實現類的所有的方法。
- 判斷一下這個實現類里面是否有同名的方法,如果有的話會打印提示,所以看到這里我們就可以知道,我們在使用定義接口的時候最好不要定義重載的方法,官方不提倡這么做
- 如果設置了Include和Exclude參數的話,會根據這兩個參數對要發布的對象進行方法級別的過濾,默認Include是
*
,Exclude參數默認是一個空的字符串
初始化注冊中心
往下我們可以看到會調用RegistryFactory.getRegistry(registryConfig);
對注冊中心進行初始化
public static synchronized Registry getRegistry(RegistryConfig registryConfig) {
// 超過3次 是不是配錯了?
if (ALL_REGISTRIES.size() > 3) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Size of registry is greater than 3, Please check it!");
}
}
try {
// 注意:RegistryConfig重寫了equals方法,如果多個RegistryConfig屬性一樣,則認為是一個對象
Registry registry = ALL_REGISTRIES.get(registryConfig);
if (registry == null) {
//通過spi根據protocol來獲取注冊中心實例
ExtensionClass<Registry> ext = ExtensionLoaderFactory.getExtensionLoader(Registry.class)
.getExtensionClass(registryConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("registry.protocol", registryConfig.getProtocol(),
"Unsupported protocol of registry config !");
}
registry = ext.getExtInstance(new Class[]{RegistryConfig.class}, new Object[]{registryConfig});
ALL_REGISTRIES.put(registryConfig, registry);
}
return registry;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}
這個方法里面主要是通過SPI實例化相應的注冊中心,然后放入到ALL_REGISTRIES對象當中去。
初始化server
接下來我們會調用serverConfig.buildIfAbsent();
初始化server
public synchronized Server buildIfAbsent() {
if (server != null) {
return server;
}
// 提前檢查協議+序列化方式
// ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
// SerializationType.valueOf(getSerialization()));
//通過工廠獲取server實例
server = ServerFactory.getServer(this);
return server;
}
public synchronized static Server getServer(ServerConfig serverConfig) {
try {
//根據端口獲取實例
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
if (server == null) {
// 算下網卡和端口
resolveServerConfig(serverConfig);
ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
"Unsupported protocol of server!");
}
//通過SPI獲取server實例
server = ext.getExtInstance();
//初始化server里面具體的參數
server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
}
return server;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}
然后我們會根據不同的server實現類調用不同的init方法來初始化參數,這里我們使用的是BoltServer
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
// 啟動線程池
bizThreadPool = initThreadPool(serverConfig);
boltServerProcessor = new BoltServerProcessor(this);
}
調用完buildIfAbsent方法后回到doExport方法中繼續往下走,調用server.registerProcessor。這個方法里面主要是把對應的Invoker實例和實例所對應的方法緩存起來。
boltServer#registerProcessor
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
// 緩存Invoker對象 包路徑名+類名+版本號 com.alipay.sofa.rpc.quickstart.HelloService:1.0
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
invokerMap.put(key, instance);
// 把這個實例所對應的類加載器緩存到SERVICE_CLASSLOADER_MAP中
ReflectCache.registerServiceClassLoader(key, providerConfig.getProxyClass().getClassLoader());
// 緩存接口的方法
for (Method m : providerConfig.getProxyClass().getMethods()) {
ReflectCache.putOverloadMethodCache(key, m);
}
}
然后調用boltServer#start方法啟動服務
public void start() {
//如果已經啟動了,那么直接返回
if (started) {
return;
}
synchronized (this) {
//雙重檢查鎖
if (started) {
return;
}
// 生成Server對象,返回的是RpcServer實例
remotingServer = initRemotingServer();
try {
//調用bolt包里面的內容,通過netty啟動服務
if (remotingServer.start()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
serverConfig.getPort());
}
} else {
throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
//設置started參數為true
started = true;
if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}
注冊服務
接下來就是調用register方法注冊服務
protected void register() {
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (registryConfigs != null) {
for (RegistryConfig registryConfig : registryConfigs) {
//得到注冊中心對象
Registry registry = RegistryFactory.getRegistry(registryConfig);
//初始化注冊中心
registry.init();
registry.start();
try {
registry.register(providerConfig);
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = providerConfig.getAppName();
if (LOGGER.isWarnEnabled(appName)) {
LOGGER.warnWithApp(appName, "Catch exception when register to registry: "
+ registryConfig.getId(), e);
}
}
}
}
}
}
這里主要是通過RegistryFactory獲取不同的注冊中心實現類
目前來說sofarpc主要實現了一下幾種注冊中心:
SOFARegistry
Zookeeper
本地文件
Consul
Nacos
注冊中心的內容我打算放在接下來的文章中講解,所以這里我們略過。
以上就是SOFARPC服務端暴露的全部內容