Nacos服务注册原理分析 - 小码code
source link: https://www.cnblogs.com/jeremylai7/p/17096211.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
在分布式服务中,原来的单体服务会被拆分成一个个微服务,服务注册实例到注册中心,服务消费者通过注册中心获取实例列表,直接请求调用服务。
服务是如何注册到注册中心,服务如果挂了,服务是如何检测?带着这些问题,我们从源码上对服务注册进行简单的源码分析。
版本 2.1.1
Nacos Server:2.1.1
spring-cloud-starter-alibaba:2.1.1.RELEASE
spring-boot:2.1.1.RELEASE
方便统一版本,客户端和服务端版本号都为
2.1.1
。
启动nacos
服务注册和发现需要添加maven
依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${latest.version}</version>
</dependency>
根据maven
依赖找到对应的spring.factories
文件:
在spring.factories
文件里找到启动配置类信息,SpringBoot
服务启动时会将这些配置类信息注入到bean
容器中。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
服务注册的核心配置类为:NacosDiscoveryAutoConfiguration
,该类配置三个bean
对象:
NacosServiceRegistry
NacosRegistration
NacosAutoServiceRegistration
NacosAutoServiceRegistration
NacosAutoServiceRegistration
继承了抽象类AbstractAutoServiceRegistration
。AbstractAutoServiceRegistration
抽象类又实现了ApplicationListener
接口。
实现ApplicationListener
接口的方法,会在Spring
容器初始化完成之后调用onApplicationEvent
方法:
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
调用bind
方法:
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
// 调用 start 方法
this.start();
}
调用了start
方法:
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
调用了register
方法,最终调用的是NacosServiceRegistry
类的register
方法。
NacosServiceRegistry
根据上文可知,服务器启动后调用NacosServiceRegistry
类的register
方法,该方法实现将实例注册到服务端:
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
// 创建实例
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 注册实例
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
创建实例,然后通过namingService.registerInstance
方法注册实例,然后查看registerInstance
方法:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
// 封装心跳包
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
// 发送心跳包
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 发送实例
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
registerInstance
主要做两件事:
- 发送心跳包
beatReactor.addBeatInfo
使用定时服务,每隔5s
向服务端发送一次心跳请求,通过http
请求发送心跳信息,路径为/v1/ns/instance/beat
。
心跳请求定时任务使用线程池ScheduledThreadPoolExecutor.schedule()
,而该方法只会调用一次,定时任务的实现是在每次请求任务只会再调用一次ScheduledThreadPoolExecutor.schedule()
,
简单说就是nacos
在发送心跳的时候,会调用schedule
方法,在schedule
要执行的任务中,如果正常发送完心跳,会再次调用schedule
方法。
那为什么不直接调用周期执行的线程池ScheduledThreadPoolExecutor.scheduleAtFixedRate()
?可能是由于发送心跳服务发生异常后,定时任务还会继续执行,但是周期执行的线程池遇到报错后也不会重复调用执行的任务。
线程任务
BeatTask
的run
方法,,每次执行会先判断isStopped
,如果是false
,说明心跳停止,就不会触发下次执行任务。如果使用定时任务scheduleAtFixedRate
,即使心跳停止还会继续执行任务,造成资源不必要浪费。
registerService
主要封装实例信息,比如ip
、port
、servicename
,将这些信息通过http
请求发送给服务端。路径为/v1/ns/instance
。
根据上面流程,查看以下的流程图:
服务端就是注册中心,服务注册到注册中心,在https://github.com/alibaba/nacos/releases/tag/2.1.1
下载源码部署到本地,方便调式和查看,部署方式详见我的另外一篇文章Nacos 源码环境搭建。
服务端主要接收两个信息:心跳包和实例信息。
客户端向服务请求的路径为/v1/ns/instance/beat
,对应的服务端为InstanceController
类的beat
方法:
@PutMapping("/beat")
@Secured(action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
// 判断是否有心跳,存在心跳就转成RsInfo
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
// 获取实例信息
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
builder.setRequest(request);
int resultCode = getInstanceOperator()
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
result.put(CommonParams.CODE, resultCode);
// 下次发送心跳包间隔
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
在handleBeat
方法中执行线程任务ClientBeatProcessorV2
的run
方法,延长lastHeartBeatTime
时间。注册中心会定时查询实例,当前时间 - lastHeartBeatTime
> 设置时间(默认15秒),就标记实例为不健康实例。如果心跳实例不健康,发送通知给订阅方,变更实例。
服务端在
15
秒没有收到心跳包会将实例设置为不健康,在30
秒没有收到心跳包会将临时实例移除掉。
客户端请求的地址是/nacos/v1/ns/instance
, 对应的是服务端是在InstanceController
类。找到类上对应的post
请求方法上。
注册流程:
InstanceController#register ——>InstanceOperatorClientImpl#registerInstance ——>ClientOperationServiceProxy#registerInstance ——>EphemeralClientOperationServiceImpl#registerInstance
创建 Service
服务注册后,将服务存储在一个双层map
集合中:
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
通过是否存在ephemeral
,true
,走AP
模式,否则走CP
模式。
Nacos
默认就是采用的AP
模式使用Distro
协议实现。实现的接口是EphemeralConsistencyService
对节点信息的持久化主要是调用put
方法,
会先写入到DataStore
中:
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 数据持久化到缓存中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
- 从依赖上找到需要启动的是要加载的服务注册类
NacosDiscoveryAutoConfiguration
,主要配置三个对象NacosServiceRegistry
NacosRegistration
NacosAutoServiceRegistration
NacosServiceRegistry
类的register
方法,封装实例和心跳信息- 通过
http
请求,定时发送发送心跳包,默认时间间隔是5
秒。 - 通过
http
请求,发送实例信息。
- 通过
- 服务端
- 接收到心跳请求,更新心跳包最新时间。服务端在
15
秒没有收到心跳包会将实例设为不健康,在30
秒没有收到心跳包会将临时实例移除掉。 - 接收到服务注册接口,通过
ephemeral
判断是否走AP
还是走CP
,AP
模式使用Distro
协议。通过调用EphemeralConsistencyService
接口实现,持久化实例信息。
- 接收到心跳请求,更新心跳包最新时间。服务端在
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK