1
Apache SkyWalking 告警动态配置源码简析
source link: https://blog.51cto.com/u_6740480/5329040
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Apache SkyWalking 告警动态配置源码简析
原创AlarmModuleProvider
实现了ModuleProvider
接口,通过SPI的方式被加载进来。AlarmModuleProvider
的prepare方法先被调用,做一些预处理:
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
Reader applicationReader;
try {
applicationReader = ResourceUtils.read("alarm-settings.yml");
} catch (FileNotFoundException e) {
throw new ModuleStartException("can't load alarm-settings.yml", e);
}
//先从文件中,读取默认的配置,此处还没有加载动态配置。
RulesReader reader = new RulesReader(applicationReader);
Rules rules = reader.readRules();
//创建一个AlarmRulesWatcher实例,这个实例用于监控和转换动态配置。
alarmRulesWatcher = new AlarmRulesWatcher(rules, this);
//创建一个NotifyHandler实例,这个实例用于处理被触发的告警。
notifyHandler = new NotifyHandler(alarmRulesWatcher);
notifyHandler.init(new AlarmStandardPersistence());
//注册到服务实现中。
this.registerServiceImplementation(MetricsNotify.class, notifyHandler);
}
随后,AlarmModuleProvider
的start方法被调用:
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
//获取动态配置服务,目前(8.2.0)支持的动态配置有apollo, consul, etcd, k8s configmap, nacos, zookeeper, grpc
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
.getService(
DynamicConfigurationService.class);
//把之前的AlarmRulesWatcher实例,注册到动态配置服务中
dynamicConfigurationService.registerConfigChangeWatcher(alarmRulesWatcher);
}
registerConfigChangeWatcher
方法的源码在ConfigWatcherRegister
抽象类中:
@Override
synchronized public void registerConfigChangeWatcher(ConfigChangeWatcher watcher) {
if (isStarted) {
throw new IllegalStateException("Config Register has been started. Can't register new watcher.");
}
//利用传入的AlarmRulesWatcher实例,创建一个WatcherHolder实例,其实是对AlarmRulesWatcher实例的再次封装,并格式化好key为alarm.default.alarm-settings。
WatcherHolder holder = new WatcherHolder(watcher);
if (register.containsKey(holder.getKey())) {
throw new IllegalStateException("Duplicate register, watcher=" + watcher);
}
//每一个不同的key对应不同的WatcherHolder实例,也就是不同动态配置对应不用的处理办法。
register.put(holder.getKey(), holder);
}
随后,ConfigWatcherRegister
抽象类的start
方法被调用:
public void start() {
isStarted = true;
//同步动态配置
configSync();
LOGGER.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + register.toString());
//异步地每隔一段时间做一次动态配置的同步
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::configSync,
t -> LOGGER.error("Sync config center error.", t)
), syncPeriod, syncPeriod, TimeUnit.SECONDS);
}
再看一下同步动态配置的configSync
方法:
void configSync() {
//读取所有注册进来的动态配置,包括告警的配置。
Optional<ConfigTable> configTable = readConfig(register.keys());
// 如果没有检测到任何配置的更改,configTable可能为null。
configTable.ifPresent(config -> {
config.getItems().forEach(item -> {
String itemName = item.getName();
//获取到配置对应的WatcherHolder实例
WatcherHolder holder = register.get(itemName);
if (holder != null) {
ConfigChangeWatcher watcher = holder.getWatcher();
String newItemValue = item.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
// 如果新的配置为null,则发送删除配置的消息类型。
watcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
} else {
// 如果不调用notify方法,则保持配置为空。
}
} else {
if (!newItemValue.equals(watcher.value())) {
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
newItemValue,
ConfigChangeWatcher.EventType.MODIFY
));
} else {
// 如果不调用notify方法,则保持在相同的配置。
}
}
} else {
LOGGER.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
}
});
LOGGER.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
});
}
注:本文以SkyWalking的8.2.0版本为例进行介绍,如果版本不同会略有差异。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK