模塊加載機制
基本概述
Module
是Skywalking
在OAP
提供的一種管理功能特性的機制。通過Module
機制,可以方便的定義模塊,並且可以提供多種實現,在配置文件中任意選擇實現。
模塊相關配置文件可以參考:Backend setup、Configuration Vocabulary
類圖
Skywalking
中模塊管理相關功能都在 org.apache.skywalking.oap.server.library.module
包下。
通過類圖可以了解 Skywalking
模塊機制大致分成如下幾個模塊:
- 模塊配置:
ApplicationConfiguration
、ModuleConfiguration
、ProviderConfiguration
- PS:剛好對應
application.yml
三層結構:模塊->模塊實現->某個模塊實現的配置。
- PS:剛好對應
- 模塊定義類:
ModuleDefine
- 模塊提供類:
ModuleProvider
- 服務:
Service
- 管理類:
ModuleManager
- 一些輔助類
ModuleDefineHolder
:模塊管理類需要實現的接口,提供查找模塊相關功能ModuleProviderHolder
:模塊定義類需要實現的接口,提供獲取模塊的服務類功能ModuleServiceHolder
:模塊提供類需要實現的接口,提供注冊服務實現、獲取服務對象的功能ModuleConfig
:模塊配置類,模塊定義類會將ProviderConfiguration
映射為ModuleConfig
ApplicationConfigLoader
:ApplicationConfiguration
的輔助類,將application.yml
配置文件加載到內存, 設置selector
對應的Provider
的配置信息
類圖源文件:Skywalking-Module.uml
源碼解析
ModuleDefine
package org.apache.skywalking.oap.server.library.module;
import java.lang.reflect.Field;
import java.util.Enumeration;
import java.util.Properties;
import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 模塊定義
public abstract class ModuleDefine implements ModuleProviderHolder {
private static final Logger LOGGER = LoggerFactory.getLogger(ModuleDefine.class);
// 模塊實際的
private ModuleProvider loadedProvider = null;
private final String name;
public ModuleDefine(String name) {
this.name = name;
}
// 模塊名
public final String name() {
return name;
}
// 實現類可以定義模塊提供的服務類
public abstract Class[] services();
/**
* Run the prepare stage for the module, including finding all potential providers, and asking them to prepare.
*
* @param moduleManager of this module
* @param configuration of this module
* @throws ProviderNotFoundException when even don't find a single one providers.
*/
// 准備階段,找到configuration配置類對應的ModuleProvider對象,進行初始化操作
void prepare(
// 模塊管理對象
ModuleManager moduleManager,
// 模塊配置類
ApplicationConfiguration.ModuleConfiguration configuration,
// 模塊提供類的服務加載器
ServiceLoader<ModuleProvider> moduleProviderLoader
) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException, ModuleStartException {
// 找到configuration配置類對應的ModuleProvider對象
for (ModuleProvider provider : moduleProviderLoader) {
if (!configuration.has(provider.name())) {
continue;
}
if (provider.module().equals(getClass())) {
if (loadedProvider == null) {
loadedProvider = provider;
loadedProvider.setManager(moduleManager);
loadedProvider.setModuleDefine(this);
} else {
throw new DuplicateProviderException(this.name() + " module has one " + loadedProvider.name() + "[" + loadedProvider.getClass().getName() + "] provider already, " + provider.name() + "[" + provider.getClass().getName() + "] is defined as 2nd provider.");
}
}
}
if (loadedProvider == null) {
throw new ProviderNotFoundException(this.name() + " module no provider found.");
}
// 復制提供類的配置文件至ModuleConfig對象
LOGGER.info("Prepare the {} provider in {} module.", loadedProvider.name(), this.name());
try {
copyProperties(
loadedProvider.createConfigBeanIfAbsent(),
configuration.getProviderConfiguration(loadedProvider.name()),
this.name(),
loadedProvider.name()
);
} catch (IllegalAccessException e) {
throw new ModuleConfigException(this.name() + " module config transport to config bean failure.", e);
}
// 模塊提供對象進入准備階段
loadedProvider.prepare();
}
// 使用反射復制屬性
private void copyProperties(ModuleConfig dest, Properties src, String moduleName, String providerName) throws IllegalAccessException {
if (dest == null) {
return;
}
Enumeration<?> propertyNames = src.propertyNames();
while (propertyNames.hasMoreElements()) {
String propertyName = (String) propertyNames.nextElement();
Class<? extends ModuleConfig> destClass = dest.getClass();
try {
Field field = getDeclaredField(destClass, propertyName);
field.setAccessible(true);
field.set(dest, src.get(propertyName));
} catch (NoSuchFieldException e) {
LOGGER.warn(propertyName + " setting is not supported in " + providerName + " provider of " + moduleName + " module");
}
}
}
private Field getDeclaredField(Class<?> destClass, String fieldName) throws NoSuchFieldException {
if (destClass != null) {
Field[] fields = destClass.getDeclaredFields();
for (Field field : fields) {
if (field.getName().equals(fieldName)) {
return field;
}
}
return getDeclaredField(destClass.getSuperclass(), fieldName);
}
throw new NoSuchFieldException();
}
// 獲取模塊定義對應的Provider對象
@Override
public final ModuleProvider provider() throws DuplicateProviderException, ProviderNotFoundException {
if (loadedProvider == null) {
throw new ProviderNotFoundException("There is no module provider in " + this.name() + " module!");
}
return loadedProvider;
}
}
ModuleProviderHolder
package org.apache.skywalking.oap.server.library.module;
// 模塊提供持有接口,通過該接口,可以獲取模塊Provider對象對應的Service持有接口,從而拿到模塊Provider對象對應的服務對象
public interface ModuleProviderHolder {
// 獲取模塊提供對象
ModuleServiceHolder provider() throws DuplicateProviderException, ProviderNotFoundException;
}
ModuleProvider
package org.apache.skywalking.oap.server.library.module;
import java.util.HashMap;
import java.util.Map;
import lombok.Setter;
// 模塊提供抽象類,所有的模塊提供類都需要繼承該抽象類
// 一個模塊定義可以配置多個模塊提供類,通過在application.yml進行切換
public abstract class ModuleProvider implements ModuleServiceHolder {
// 模塊管理器
@Setter
private ModuleManager manager;
// 模塊定義對象
@Setter
private ModuleDefine moduleDefine;
// 模塊提供對應的服務對象map
private final Map<Class<? extends Service>, Service> services = new HashMap<>();
public ModuleProvider() {
}
protected final ModuleManager getManager() {
return manager;
}
// 獲取服務提供實現類的name,需要子類實現
public abstract String name();
// 定義模塊提供者所實現的模塊定義類
public abstract Class<? extends ModuleDefine> module();
// 創建模塊定義配置對象
public abstract ModuleConfig createConfigBeanIfAbsent();
// 准備階段(初始化與其他模塊無關的事情)
public abstract void prepare() throws ServiceNotProvidedException, ModuleStartException;
// 啟動階段(該階段模塊間可以互相操作)
public abstract void start() throws ServiceNotProvidedException, ModuleStartException;
// 完成后通知階段(在所有模塊成功啟動后執行)
public abstract void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException;
// 該模塊需要依賴的其他模塊名
public abstract String[] requiredModules();
// 注冊服務實現類
@Override
public final void registerServiceImplementation(Class<? extends Service> serviceType, Service service) throws ServiceNotProvidedException {
if (serviceType.isInstance(service)) {
this.services.put(serviceType, service);
} else {
throw new ServiceNotProvidedException(serviceType + " is not implemented by " + service);
}
}
// 確保所有服務被實現
void requiredCheck(Class<? extends Service>[] requiredServices) throws ServiceNotProvidedException {
if (requiredServices == null)
return;
for (Class<? extends Service> service : requiredServices) {
if (!services.containsKey(service)) {
throw new ServiceNotProvidedException("Service:" + service.getName() + " not provided");
}
}
if (requiredServices.length != services.size()) {
throw new ServiceNotProvidedException("The " + this.name() + " provider in " + moduleDefine.name() + " moduleDefine provide more service implementations than ModuleDefine requirements.");
}
}
// 獲取服務實現對象
@Override
public @SuppressWarnings("unchecked")
<T extends Service> T getService(Class<T> serviceType) throws ServiceNotProvidedException {
Service serviceImpl = services.get(serviceType);
if (serviceImpl != null) {
return (T) serviceImpl;
}
throw new ServiceNotProvidedException("Service " + serviceType.getName() + " should not be provided, based on moduleDefine define.");
}
ModuleDefine getModule() {
return moduleDefine;
}
String getModuleName() {
return moduleDefine.name();
}
}
ModuleConfig
package org.apache.skywalking.oap.server.library.module;
// 模塊配置類
public abstract class ModuleConfig {
}
ModuleServiceHolder
package org.apache.skywalking.oap.server.library.module;
// 模塊服務持有接口
public interface ModuleServiceHolder {
// 注冊服務實現對象
void registerServiceImplementation(Class<? extends Service> serviceType, Service service) throws ServiceNotProvidedException;
// 獲取服務實現對象
<T extends Service> T getService(Class<T> serviceType) throws ServiceNotProvidedException;
}
Service
package org.apache.skywalking.oap.server.library.module;
// 服務接口
public interface Service {
}
ModuleDefineHolder
package org.apache.skywalking.oap.server.library.module;
// 模塊定義持有接口
public interface ModuleDefineHolder {
// 判斷是否有該模塊
boolean has(String moduleName);
// 通過模塊名獲取模塊定義對象
ModuleProviderHolder find(String moduleName) throws ModuleNotFoundRuntimeException;
}
ModuleManager
package org.apache.skywalking.oap.server.library.module;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.ServiceLoader;
// 模塊管理類,管理模塊的生命周期
public class ModuleManager implements ModuleDefineHolder {
// 所有模塊是否已經通過准備階段
private boolean isInPrepareStage = true;
// 所有被加載的模塊定義對象map
private final Map<String, ModuleDefine> loadedModules = new HashMap<>();
// 初始化所有配置的模塊
public void init(ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
// 獲取配置類中的模塊名
String[] moduleNames = applicationConfiguration.moduleList();
// SPI加載所有模塊定義對象
ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
// SPI加載所有模塊提供對象
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
// 所有配置類中定義的模塊,進行准備階段
LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
for (ModuleDefine module : moduleServiceLoader) {
for (String moduleName : moduleNames) {
if (moduleName.equals(module.name())) {
module.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName), moduleProviderLoader);
loadedModules.put(moduleName, module);
moduleList.remove(moduleName);
}
}
}
// 准備階段結束
isInPrepareStage = false;
if (moduleList.size() > 0) {
throw new ModuleNotFoundException(moduleList.toString() + " missing.");
}
// 根據模塊提供對象中的requiredModules方法,確定模塊的初始化順序(被依賴的模塊先行加載)
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
// 所有模塊進入啟動階段
bootstrapFlow.start(this);
// 所有模塊進入完成后通知階段
bootstrapFlow.notifyAfterCompleted();
}
// 判斷是否有該模塊
@Override
public boolean has(String moduleName) {
return loadedModules.get(moduleName) != null;
}
// 通過模塊名獲取模塊定義對象
@Override
public ModuleProviderHolder find(String moduleName) throws ModuleNotFoundRuntimeException {
assertPreparedStage();
ModuleDefine module = loadedModules.get(moduleName);
if (module != null)
return module;
throw new ModuleNotFoundRuntimeException(moduleName + " missing.");
}
// 斷言是否還在准備階段,如果還在准備階段,則拋出異常
private void assertPreparedStage() {
if (isInPrepareStage) {
throw new AssertionError("Still in preparing stage.");
}
}
}
BootstrapFlow
package org.apache.skywalking.oap.server.library.module;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 根據模塊提供對象中的requiredModules方法,確定模塊的初始化順序(被依賴的模塊先行加載)
class BootstrapFlow {
private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapFlow.class);
private Map<String, ModuleDefine> loadedModules;
// 按依賴順序排序的模塊提供對象列表
private List<ModuleProvider> startupSequence;
BootstrapFlow(Map<String, ModuleDefine> loadedModules) throws CycleDependencyException, ModuleNotFoundException {
this.loadedModules = loadedModules;
startupSequence = new LinkedList<>();
// 被依賴的模塊先行加載
makeSequence();
}
@SuppressWarnings("unchecked")
void start(
ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
LOGGER.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
provider.requiredCheck(provider.getModule().services());
provider.start();
}
}
void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
provider.notifyAfterCompleted();
}
}
private void makeSequence() throws CycleDependencyException, ModuleNotFoundException {
List<ModuleProvider> allProviders = new ArrayList<>();
// 判斷所有被依賴的模塊是否存在
for (final ModuleDefine module : loadedModules.values()) {
String[] requiredModules = module.provider().requiredModules();
if (requiredModules != null) {
for (String requiredModule : requiredModules) {
if (!loadedModules.containsKey(requiredModule)) {
throw new ModuleNotFoundException(requiredModule + " module is required by " + module.provider().getModuleName() + "." + module.provider().name() + ", but not found.");
}
}
}
allProviders.add(module.provider());
}
do {
int numOfToBeSequenced = allProviders.size();
for (int i = 0; i < allProviders.size(); i++) {
ModuleProvider provider = allProviders.get(i);
String[] requiredModules = provider.requiredModules();
if (CollectionUtils.isNotEmpty(requiredModules)) {
// 是否所有依賴的模塊都在startupSequence中
boolean isAllRequiredModuleStarted = true;
for (String module : requiredModules) {
boolean exist = false;
for (ModuleProvider moduleProvider : startupSequence) {
if (moduleProvider.getModuleName().equals(module)) {
exist = true;
break;
}
}
if (!exist) {
isAllRequiredModuleStarted = false;
break;
}
}
// 所有依賴的模塊都在startupSequence,則將該模塊提供對象加入startupSequence
if (isAllRequiredModuleStarted) {
startupSequence.add(provider);
allProviders.remove(i);
i--;
}
} else {
// 如果該模塊提供對象不依賴任何其他模塊,則加入startupSequence
startupSequence.add(provider);
allProviders.remove(i);
i--;
}
}
// 如果一次循環后,沒有任何一個對象加入到startupSequence,則證明有循環依賴
if (numOfToBeSequenced == allProviders.size()) {
StringBuilder unSequencedProviders = new StringBuilder();
allProviders.forEach(provider -> unSequencedProviders.append(provider.getModuleName()).append("[provider=").append(provider.getClass().getName()).append("]\n"));
throw new CycleDependencyException("Exist cycle module dependencies in \n" + unSequencedProviders.substring(0, unSequencedProviders.length() - 1));
}
} while (allProviders.size() != 0); // 當提供對象列表不為空,則一直循環執行下去
}
}
ApplicationConfiguration
package org.apache.skywalking.oap.server.library.module;
import java.util.HashMap;
import java.util.Properties;
// OAP應用配置類
public class ApplicationConfiguration {
// 模塊定義配置map
private HashMap<String, ModuleConfiguration> modules = new HashMap<>();
// 模塊配置名列表
public String[] moduleList() {
return modules.keySet().toArray(new String[0]);
}
// 添加模塊定義配置
public ModuleConfiguration addModule(String moduleName) {
ModuleConfiguration newModule = new ModuleConfiguration();
modules.put(moduleName, newModule);
return newModule;
}
// 判斷指定模塊名是否存在模塊定義配置map中
public boolean has(String moduleName) {
return modules.containsKey(moduleName);
}
// 獲取模塊定義配置
public ModuleConfiguration getModuleConfiguration(String name) {
return modules.get(name);
}
// 模塊定義配置類
public static class ModuleConfiguration {
// 模塊提供對象map
private HashMap<String, ProviderConfiguration> providers = new HashMap<>();
private ModuleConfiguration() {
}
// 獲取模塊提供配置
public Properties getProviderConfiguration(String name) {
return providers.get(name).getProperties();
}
// 是否存在模塊提供配置
public boolean has(String name) {
return providers.containsKey(name);
}
// 添加模塊提供配置
public ModuleConfiguration addProviderConfiguration(String name, Properties properties) {
ProviderConfiguration newProvider = new ProviderConfiguration(properties);
providers.put(name, newProvider);
return this;
}
}
// 模塊提供配置類
public static class ProviderConfiguration {
// 模塊提供屬性
private Properties properties;
ProviderConfiguration(Properties properties) {
this.properties = properties;
}
private Properties getProperties() {
return properties;
}
}
}
ApplicationConfigLoader
package org.apache.skywalking.oap.server.starter.config;
import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.yaml.snakeyaml.Yaml;
// application.yml加載類, 三層結構:模塊定義名.模塊提供名.屬性key
@Slf4j
public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfiguration> {
// 當不配置模塊提供者時,使用"-"
private static final String DISABLE_SELECTOR = "-";
// 該字段選擇模塊提供者
private static final String SELECTOR = "selector";
private final Yaml yaml = new Yaml();
@Override
public ApplicationConfiguration load() throws ConfigFileNotFoundException {
ApplicationConfiguration configuration = new ApplicationConfiguration();
this.loadConfig(configuration);
this.overrideConfigBySystemEnv(configuration);
return configuration;
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException {
try {
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Object>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
selectConfig(moduleConfig);
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
log.info("Get a module define from application.yml, module name: {}", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((providerName, config) -> {
log.info("Get a provider define belong to {} module, provider name: {}", moduleName, providerName);
final Map<String, ?> propertiesConfig = (Map<String, ?>) config;
final Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((propertyName, propertyValue) -> {
if (propertyValue instanceof Map) {
Properties subProperties = new Properties();
((Map) propertyValue).forEach((key, value) -> {
subProperties.put(key, value);
replacePropertyAndLog(key, value, subProperties, providerName);
});
properties.put(propertyName, subProperties);
} else {
properties.put(propertyName, propertyValue);
replacePropertyAndLog(propertyName, propertyValue, properties, providerName);
}
});
}
moduleConfiguration.addProviderConfiguration(providerName, properties);
});
} else {
log.warn("Get a module define from application.yml, but no provider define, use default, module name: {}", moduleName);
}
});
}
} catch (FileNotFoundException e) {
throw new ConfigFileNotFoundException(e.getMessage(), e);
}
}
private void replacePropertyAndLog(final Object propertyName, final Object propertyValue, final Properties target, final Object providerName) {
final String valueString = PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(propertyValue + "", target);
if (valueString != null) {
if (valueString.trim().length() == 0) {
target.replace(propertyName, valueString);
log.info("Provider={} config={} has been set as an empty string", providerName, propertyName);
} else {
// Use YAML to do data type conversion.
final Object replaceValue = yaml.load(valueString);
if (replaceValue != null) {
target.replace(propertyName, replaceValue);
log.info("Provider={} config={} has been set as {}", providerName, propertyName, replaceValue.toString());
}
}
}
}
private void overrideConfigBySystemEnv(ApplicationConfiguration configuration) {
for (Map.Entry<Object, Object> prop : System.getProperties().entrySet()) {
overrideModuleSettings(configuration, prop.getKey().toString(), prop.getValue().toString());
}
}
private void selectConfig(final Map<String, Map<String, Object>> moduleConfiguration) {
final Set<String> modulesWithoutProvider = new HashSet<>();
for (final Map.Entry<String, Map<String, Object>> entry : moduleConfiguration.entrySet()) {
final String moduleName = entry.getKey();
final Map<String, Object> providerConfig = entry.getValue();
if (!providerConfig.containsKey(SELECTOR)) {
continue;
}
final String selector = (String) providerConfig.get(SELECTOR);
final String resolvedSelector = PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(
selector, System.getProperties()
);
providerConfig.entrySet().removeIf(e -> !resolvedSelector.equals(e.getKey()));
if (!providerConfig.isEmpty()) {
continue;
}
if (!DISABLE_SELECTOR.equals(resolvedSelector)) {
throw new ProviderNotFoundException("no provider found for module " + moduleName + ", " + "if you're sure it's not required module and want to remove it, " + "set the selector to -");
}
// now the module can be safely removed
modulesWithoutProvider.add(moduleName);
}
moduleConfiguration.entrySet().removeIf(e -> {
final String module = e.getKey();
final boolean shouldBeRemoved = modulesWithoutProvider.contains(module);
if (shouldBeRemoved) {
log.info("Remove module {} without any provider", module);
}
return shouldBeRemoved;
});
}
private void overrideModuleSettings(ApplicationConfiguration configuration, String key, String value) {
int moduleAndConfigSeparator = key.indexOf('.');
if (moduleAndConfigSeparator <= 0) {
return;
}
String moduleName = key.substring(0, moduleAndConfigSeparator);
String providerSettingSubKey = key.substring(moduleAndConfigSeparator + 1);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.getModuleConfiguration(moduleName);
if (moduleConfiguration == null) {
return;
}
int providerAndConfigSeparator = providerSettingSubKey.indexOf('.');
if (providerAndConfigSeparator <= 0) {
return;
}
String providerName = providerSettingSubKey.substring(0, providerAndConfigSeparator);
String settingKey = providerSettingSubKey.substring(providerAndConfigSeparator + 1);
if (!moduleConfiguration.has(providerName)) {
return;
}
Properties providerSettings = moduleConfiguration.getProviderConfiguration(providerName);
if (!providerSettings.containsKey(settingKey)) {
return;
}
Object originValue = providerSettings.get(settingKey);
Class<?> type = originValue.getClass();
if (type.equals(int.class) || type.equals(Integer.class))
providerSettings.put(settingKey, Integer.valueOf(value));
else if (type.equals(String.class))
providerSettings.put(settingKey, value);
else if (type.equals(long.class) || type.equals(Long.class))
providerSettings.put(settingKey, Long.valueOf(value));
else if (type.equals(boolean.class) || type.equals(Boolean.class)) {
providerSettings.put(settingKey, Boolean.valueOf(value));
} else {
return;
}
log.info("The setting has been override by key: {}, value: {}, in {} provider of {} module through {}", settingKey, value, providerName, moduleName, "System.properties");
}
}
ConfigLoader
package org.apache.skywalking.oap.server.starter.config;
// 配置加載接口
public interface ConfigLoader<T> {
T load() throws ConfigFileNotFoundException;
}
OAPServerBootstrap
package org.apache.skywalking.oap.server.starter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader;
import org.apache.skywalking.oap.server.starter.config.ConfigLoader;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
// OAP啟動類,加載配置文件,初始化模塊
@Slf4j
public class OAPServerBootstrap {
public static void start() {
String mode = System.getProperty("mode");
// 啟動模式
RunningMode.setMode(mode);
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
try {
// 從配置文件中加載配置
ApplicationConfiguration applicationConfiguration = configLoader.load();
// 初始化模塊
manager.init(applicationConfiguration);
// 將啟動時間發送給Telemetry
manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
// Set uptime to second
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
log.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
System.exit(1);
}
}
}
OAPServerStartUp
package org.apache.skywalking.oap.server.starter;
// OAP啟動類
public class OAPServerStartUp {
public static void main(String[] args) {
OAPServerBootstrap.start();
}
}
以 Skywalking OAP
啟動流程分析模塊加載機制
時序圖
案例:存儲模塊加載分析
配置文件
從 application.yml
配置文件,可以看出。
模塊是以三層結構來定義的:
- 第一層:模塊定義名
- 第二層:模塊提供名/
selector
- 第三層:模塊提供配置信息/
selector
選擇的模塊提供配置
storage:
selector: ${SW_STORAGE:h2}
elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# etc...
elasticsearch7:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# etc...
h2:
driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db;DB_CLOSE_DELAY=-1}
# etc...
mysql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
# etc...
tidb:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
# etc...
influxdb:
url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
user: ${SW_STORAGE_INFLUXDB_USER:root}
password: ${SW_STORAGE_INFLUXDB_PASSWORD:}
# etc...
加載配置
經過 org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader#load
的調用, org.apache.skywalking.oap.server.starter.OAPServerBootstrap#start
獲取到了所有需要加載的模塊,其中包括存儲模塊。
在 org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader#selectConfig
也通過 storage.selector=h2
,存儲模塊只保留了 h2
的配置信息:
storage:
h2:
driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db;DB_CLOSE_DELAY=-1}
# etc...
准備階段
在org.apache.skywalking.oap.server.library.module.ModuleManager#init
中,通過 SPI
加載了模塊定義對象,存儲模塊對應的定義類如下:
PS:可以看到定義了大量 Service
接口
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* StorageModule provides the capabilities(services) to interact with the database. With different databases, this
* module could have different providers, such as currently, H2, MySQL, ES, TiDB.
*/
public class StorageModule extends ModuleDefine {
public static final String NAME = "storage";
public StorageModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[]{
IBatchDAO.class,
StorageDAO.class,
IHistoryDeleteDAO.class,
INetworkAddressAliasDAO.class,
ITopologyQueryDAO.class,
IMetricsQueryDAO.class,
ITraceQueryDAO.class,
IMetadataQueryDAO.class,
IAggregationQueryDAO.class,
IAlarmQueryDAO.class,
ITopNRecordsQueryDAO.class,
ILogQueryDAO.class,
IProfileTaskQueryDAO.class,
IProfileTaskLogQueryDAO.class,
IProfileThreadSnapshotQueryDAO.class,
UITemplateManagementDAO.class,
IBrowserLogQueryDAO.class
};
}
}
同時也會調用 org.apache.skywalking.oap.server.library.module.ModuleDefine#prepare
進入准備階段
String[] moduleNames = applicationConfiguration.moduleList();
ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
for (ModuleDefine module : moduleServiceLoader) {
for (String moduleName : moduleNames) {
if (moduleName.equals(module.name())) {
module.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName), moduleProviderLoader);
loadedModules.put(moduleName, module);
moduleList.remove(moduleName);
}
}
}
在 org.apache.skywalking.oap.server.library.module.ModuleDefine#prepare
會通過傳入的配置,只匹配上配置文件選擇的模塊提供對象。
for (ModuleProvider provider : moduleProviderLoader) {
if (!configuration.has(provider.name())) {
continue;
}
if (provider.module().equals(getClass())) {
if (loadedProvider == null) {
loadedProvider = provider;
loadedProvider.setManager(moduleManager);
loadedProvider.setModuleDefine(this);
} else {
throw new DuplicateProviderException(this.name() + " module has one " + loadedProvider.name() + "[" + loadedProvider.getClass().getName()
+ "] provider already, " + provider.name() + "[" + provider.getClass().getName() + "] is defined as 2nd provider.");
}
}
}
if (loadedProvider == null) {
throw new ProviderNotFoundException(this.name() + " module no provider found.");
}
LOGGER.info("Prepare the {} provider in {} module.", loadedProvider.name(), this.name());
try {
copyProperties(loadedProvider.createConfigBeanIfAbsent(), configuration.getProviderConfiguration(loadedProvider.name()), this.name(), loadedProvider.name());
} catch (IllegalAccessException e) {
throw new ModuleConfigException(this.name() + " module config transport to config bean failure.", e);
}
loadedProvider.prepare();
例如“配置文件”一節選擇的h2
,則加載的提供類為 org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider
它的 prepare
方法如下,可以看到:注冊了所有 StorageModule
聲明的 Service
接口
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
Properties settings = new Properties();
settings.setProperty("dataSourceClassName", config.getDriver());
settings.setProperty("dataSource.url", config.getUrl());
settings.setProperty("dataSource.user", config.getUser());
settings.setProperty("dataSource.password", config.getPassword());
h2Client = new JDBCHikariCPClient(settings);
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
);
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client));
this.registerServiceImplementation(
ITraceQueryDAO.class, new H2TraceQueryDAO(
getManager(),
h2Client,
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
));
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new H2BrowserLogQueryDAO(h2Client));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
this.registerServiceImplementation(
ILogQueryDAO.class,
new H2LogQueryDAO(
h2Client,
getManager(),
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
)
);
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client));
}
啟動階段
在org.apache.skywalking.oap.server.library.module.ModuleManager#init
通過調用 org.apache.skywalking.oap.server.library.module.BootstrapFlow#start
進入啟動階段
void start(
ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
LOGGER.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
provider.requiredCheck(provider.getModule().services());
provider.start();
}
}
例如“配置文件”一節選擇的h2
,則加載的提供類為 org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider
它的 start
方法如下,可以看到:啟動 h2client
並監聽 ModelCreator
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final ConfigService configService = getManager().find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
final int numOfSearchableTracesTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
if (numOfSearchableTracesTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTracesTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
final int numOfSearchableLogsTags = configService.getSearchableLogsTags().split(Const.COMMA).length;
if (numOfSearchableLogsTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableLogsTags[" + numOfSearchableLogsTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
"storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
h2Client.registerChecker(healthChecker);
try {
h2Client.connect();
H2TableInstaller installer = new H2TableInstaller(
h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag());
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
完成后通知階段
在org.apache.skywalking.oap.server.library.module.ModuleManager#init
通過調用 org.apache.skywalking.oap.server.library.module.BootstrapFlow#notifyAfterCompleted
進入完成后通知階段
void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
provider.notifyAfterCompleted();
}
}
例如“配置文件”一節選擇的h2
,則加載的提供類為 org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider
它的 notifyAfterCompleted
方法如下,可以看到:不需要做什么
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
H2StorageProvider
完整源碼
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
// etc...
/**
* H2 Storage provider is for demonstration and preview only. I will find that haven't implemented several interfaces,
* because not necessary, and don't consider about performance very much.
* <p>
* If someone wants to implement SQL-style database as storage, please just refer the logic.
*/
@Slf4j
public class H2StorageProvider extends ModuleProvider {
private H2StorageConfig config;
private JDBCHikariCPClient h2Client;
public H2StorageProvider() {
config = new H2StorageConfig();
}
@Override
public String name() {
return "h2";
}
@Override
public Class<? extends ModuleDefine> module() {
return StorageModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
Properties settings = new Properties();
settings.setProperty("dataSourceClassName", config.getDriver());
settings.setProperty("dataSource.url", config.getUrl());
settings.setProperty("dataSource.user", config.getUser());
settings.setProperty("dataSource.password", config.getPassword());
h2Client = new JDBCHikariCPClient(settings);
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
);
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client));
this.registerServiceImplementation(
ITraceQueryDAO.class, new H2TraceQueryDAO(
getManager(),
h2Client,
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
));
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new H2BrowserLogQueryDAO(h2Client));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
this.registerServiceImplementation(
ILogQueryDAO.class,
new H2LogQueryDAO(
h2Client,
getManager(),
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
)
);
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final ConfigService configService = getManager().find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
final int numOfSearchableTracesTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
if (numOfSearchableTracesTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTracesTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
final int numOfSearchableLogsTags = configService.getSearchableLogsTags().split(Const.COMMA).length;
if (numOfSearchableLogsTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableLogsTags[" + numOfSearchableLogsTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
"storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
h2Client.registerChecker(healthChecker);
try {
h2Client.connect();
H2TableInstaller installer = new H2TableInstaller(
h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag());
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
總結
Skywalking
提供的模塊機制是非常優良的設計,在工作中,如果有多個 N
選 1
的場景,是可以借鑒它的設計的。
參考文檔
分享並記錄所學所見