7

nacos注册中心单节点ap架构源码解析 - bei_er

 1 year ago
source link: https://www.cnblogs.com/yuanbeier/p/17017564.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

一、注册流程

单nacos节点流程图如下:

1105368-20221231231425315-803351492.png

流程图可以知,Nacos注册流程包括客户端的服务注册、服务实例列表拉取、定时心跳任务;以及服务端的定时检查服务实例任务、服务实例更新推送5个功能。

服务注册:当客户端启动的时候会根据当前微服务的配置信息把微服务注册到nacos服务端。

服务实例列表拉取:当客户端启动的时候从nacos服务端获取当前服务的名称已经注册的实例数据,并把这些实例数据缓存在客户端的serviceInfoMap 对象中。

定时心跳任务:当客户端向nacos服务注册临时实例对象的时候,会创建一个延期的任务去往服务端发送心跳信息。如果发送心跳信息成功,则又会创建一个延期任务往服务端注册心跳信息,一直重复该逻辑。nacos服务端接收到客户端的心跳信息就是更新客户端实例的最后心跳时间。该时间用来判断实例是否健康和是否需要删除。

定时检查服务实例任务:nacos服务端在创建空服务对象的时候会通过线程池来定时执行检查服务,其主要逻辑为判断当前时间和最后心跳时间之差是否大于健康超时时间和删除实例超时时间,如果大于,则更新实例的健康状态和删除当前实例。定时执行的规则为5秒之后执行检查,并且每次执行完检查之后,5秒之后再次执行检查。

服务实例更新推送:当有客户端更新实例对象时,服务端会先获取该客户端的服务名称下所有已经注册的客户端实例,并会针每一个客户端发送一个更新serviceinfo的udp消息,客户端监听收到nacos服务端发送的udp数据后进行本地缓存的更新。

二、客户端

一、服务注册

根据spring-cloud-starter-alibaba-nacos-discovery的spring.factories文件,找到服务注册启动配置类。

1105368-20221231231434875-484829591.png

spring.factories文件内容为如下,

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
  com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

根据名称判断可以得出 NacosServiceRegistryAutoConfiguration 为服务注册启动配置类,源码如下

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

关键类 NacosAutoServiceRegistration 的类图结构如下

1105368-20221231231445701-1956026393.png

上图可知,NacosAutoServiceRegistration 实现了 ApplicationListener接口,该监听器会在SpringBoot启动的时候会自动调用 onApplicationEvent方法,onApplicationEvent具体实现方法如下

public void onApplicationEvent(WebServerInitializedEvent event) {
    this.bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
        this.port.compareAndSet(0, event.getWebServer().getPort());
        // 具体的启动方法
        this.start();
    }
}

具体的启动方法this.start();方法的代码如下,

public void start() {
    if (!this.isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }

    } else {
        if (!this.running.get()) {
            this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
            // 关键逻辑
            this.register();
            if (this.shouldRegisterManagement()) {
                this.registerManagement();
            }

            this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }

关键逻辑为this.register();方法代码如下

protected void register() {
    if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
        log.debug("Registration disabled.");
        return;
    }
    if (this.registration.getPort() < 0) {
        this.registration.setPort(getPort().get());
    }
    super.register();
}

关键逻辑为super.register();方法代码如下,

protected void register() {
    this.serviceRegistry.register(this.getRegistration());
}

关键逻辑为this.serviceRegistry.register方法代码如下,

@Override
public void register(Registration registration) {

    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
	// 根据配置属性构建NamingService对象
    NamingService namingService = namingService();
    // 获取服务名,默认为 ${spring.application.name}
    String serviceId = registration.getServiceId();
    // 获取组名 ,默认为 DEFAULT_GROUP
    String group = nacosDiscoveryProperties.getGroup();

    // 创建注册实例
    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        // 发起注册
        namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                 instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        log.error("nacos registry, {} register failed...{},", serviceId,
                  registration.toString(), e);
        // rethrow a RuntimeException if the registration is failed.
        // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
        rethrowRuntimeException(e);
    }
}

先通过getNacosInstanceFromRegistration方法创建实例对象,getNacosInstanceFromRegistration代码如下,

private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();
    // 获取服务ip
    instance.setIp(registration.getHost());
    // 获取服务
    instance.setPort(registration.getPort());
    // 获取权重
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    // 获取集群名称
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
  
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
    // 获取元数据
    instance.setMetadata(registration.getMetadata());
    // 获取是否为临时实例
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}

然后通过namingService.registerInstance方法发起注册,registerInstance方法的代码如下,

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查 实例是否合法 
    // heart beat timeout must(默认15秒) < heart beat interval (默认5秒)抛异常
    // ip delete timeout must(默认30 秒) < heart beat interval(默认5秒)抛异常
    NamingUtils.checkInstanceIsLegal(instance);
    // 构建 groupName@@serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果是临时实例,则创建心跳信息,定时给nacos服务发送
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
        this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
	// 向 nacos-service 注册实例
    this.serverProxy.registerService(groupedServiceName, groupName, instance);
}

先检查实例是否合法,然后构建服务名称,规则为groupName@@serviceName。通过this.serverProxy.registerService方法向 nacos-service 注册实例,代码如下,

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);

    final Map<String, String> params = new HashMap<String, String>(16);
    //设置 namespaceId
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    //设置 serviceName
    params.put(CommonParams.SERVICE_NAME, serviceName);
    //设置 groupName
    params.put(CommonParams.GROUP_NAME, groupName);
    //设置 clusterName
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
	// 调用 nacos-service 的nacosUrlInstance接口注册实例
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}

通过向reqApi方法向nacos服务端注册当前实例数据,其实就是向 ${spring.cloud.nacos.discovery.server-addr}/nacos/v1/ns/instance 发送POST请求。该请求地址对应的nacos服务端的源码的naming工程中InstanceController的register方法,代码如下,

public String register(HttpServletRequest request) throws Exception {
    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
	//根据请求构建 Instance 对象
    final Instance instance = parseInstance(request);
	//注册 Instance 对象,serviceManager对象中保存了所有的服务对象。
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

先根据请求对象构建Instance对象,然后通过serviceManager.registerInstance方法用来注册Instance对象,registerInstance代码如下

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	// 如果 namespaceId 为 key 的数据为空,则创建 service ,并初始化service
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
	// 获取 service 对象
    Service service = getService(namespaceId, serviceName);
	// 如果 service为空 则报错
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
	// 添加实例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

如果 namespaceId为key的数据为空,则创建 service,并初始化service。然后调用addInstance添加实例对象,addInstance方法代码如下,

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {
	  // 根据 命名空间 和 服务名称 构建 key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取 service
        Service service = getService(namespaceId, serviceName);
        // 同步锁
        synchronized (service) {
            // 获取服务下的实例集合(服务已有 + 新增的实例)
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 根据KEY添加服务的实例
            consistencyService.put(key, instances);
        }
}

addIpAddresses方法中会调用updateIpAddresses方法,且action为 add。该方法根据action的值来获取该服务下的最新实例集合(新增实例或删除实例加上目前服务已有的实例数据合集)。如果action为add表示新增,则方法最后返回的集合对象中会把该服务中已有的实例集合加上新增的实例集合数据一起返回 ;如果action为 remove表示删除,则方法最后返回的集合对象中会把该服务中已有的实例集合删除掉需要删除的实例集合数据。后面通过调用consistencyService.put(key, instances)方法来把updateIpAddresses方法返回的值直接添加consistencyService的实例中。updateIpAddresses方法的代码如下,

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
    throws NacosException {
    // 从本地缓存中获取服务的实例数据
    Datum datum = consistencyService
        .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    // 获取 当前服务下所有的 实例
    List<Instance> currentIPs = service.allIPs(ephemeral);
    // 创建当前实例数据map
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    // 创建 当前实例Id set
    Set<String> currentInstanceIds = Sets.newHashSet();

    // 遍历当前服务的所有实例,添加到 创建当前实例数据 map 和 当前实例Id集合
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }
    // 构造 实例集合对象的 map
    Map<String, Instance> instanceMap;
    // 如果有缓存数据
    if (datum != null && null != datum.value) {
        // 从本地缓存中以及当前服务的内存数据获取最新服务的实例数据
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    }
    // 如果没有缓存数据
    else {
        // 创建 instanceMap
        instanceMap = new HashMap<>(ips.length);
    }
    // 遍历参数传过来的实例对象
    for (Instance instance : ips) {
        // 如果 service 不包括 实例的 ClusterName 则创建 实例 Cluster,并初始化
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                      instance.getClusterName(), instance.toJson());
        }
        // 如果是删除,则从 instanceMap 中 删除 该实例
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        }
        // 如果是新增
        else {
            //获取已存在的 实例
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                // 生成 实例 id
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            // instanceMap 添加instance实例
            instanceMap.put(instance.getDatumKey(), instance);
        }

    }
    // 如果集合小于0 ,并且是新增操作则抛异常
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
            "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
            .toJson(instanceMap.values()));
    }
    // 返回 服务中最新的实例数据
    return new CopyOnWriteArrayList<>(instanceMap.values());
}

通过updateIpAddresses方法拿到需要更新的实例集合对象后,再通过consistencyService.put(key, instances)把拿到的实例集合对象添加到实现了PersistentConsistencyServiceDelegateImpl或者EphemeralConsistencyService接口的实例对象中,consistencyService.put(key, instances)的源码如下,

@Override
public void put(String key, Record value) throws NacosException {
    // 根据key获取具体的 consistencyService ,并且向其中添加具体的 key 和 value
    mapConsistencyService(key).put(key, value);
}

根据key获取具体的 consistencyService ,并且向其中添加具体的 key 和 value。consistencyService中根据key获取集群的实例对象(临时服务对象EphemeralConsistencyService和持久服务对象PersistentConsistencyServiceDelegateImpl)

private ConsistencyService mapConsistencyService(String key) {
    // 根据key返回具体的服务对象
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

如果是注册的临时实例节点,这里取到的是实现了ephemeralConsistencyService接口的DistroConsistencyServiceImpl 对象,它的put源码如下:

@Override
public void put(String key, Record value) throws NacosException {
    // 添加key 和 value
    onPut(key, value);
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                        globalConfig.getTaskDispatchPeriod() / 2);
}

通过onPut方法添加key 和 value,opPut方法的代码如下,

public void onPut(String key, Record value) {
    // 如果是临时节点实例,则创建 Datum 并保存在 dataStore 中
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    // 如果 监听对象不包括 key 则返回
    if (!listeners.containsKey(key)) {
        return;
    }
    // 向notifier对象添加通知任务
    notifier.addTask(key, DataOperation.CHANGE);
}

如果是临时实例节点,则创建 Datum 并保存在 dataStore 中,然后通过notifier.addTask用来向notifier对象添加通知任务,且操作类型为DataOperation.CHANGE,addTask方法的代码如下:

public void addTask(String datumKey, DataOperation action) {
    // 如果services包括了当前的 datumKey ,并且是修改操作 则直接返回
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    // 如果是修改操作,则向 services 添加 datumKey
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    // 向 tasks中添加 Pair 对象
    tasks.offer(Pair.with(datumKey, action));
}

以上代码的tasks是用来存放具体实例key和动作类型的对象,它是一个ArrayBlockingQueue对象,DistroConsistencyServiceImpl 对象的init方法代码如下,

@PostConstruct
public void init() {
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

根据以上代码可知,在DistroConsistencyServiceImpl 实例对象初始化之后会往GlobalExecutor线程池对象中添加了一个notifier对象。notifier对象为一个实现了Runnable 的实例。上面的代码会执行notifier对象的run方法,notifier的run方法代码如下:

public void run() {
    Loggers.DISTRO.info("distro notifier started");
    // 死循环遍历
    for (; ; ) {
        try {
            // 获取 tasks的数据,如果没有数据会阻塞当前线程,直到tasks有数据为止。
            Pair<String, DataOperation> pair = tasks.take();
            // 处理数据
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

上面是一个死循环,tasks.take()是一个阻塞式获取数据的方法,如果tasks没有数据则会阻塞当前线程直到tasks.take()拿到数据,拿到数据之后会调用handle方法处理,handle代码如下,

private void handle(Pair<String, DataOperation> pair) {
    try {
        String datumKey = pair.getValue0();
        DataOperation action = pair.getValue1();
        // 先从 services 中删除 key
        services.remove(datumKey);

        int count = 0;
        // 根据 key 获取 服务对象数据
        ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey);
        if (recordListeners == null) {
            Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);
            return;
        }
        for (RecordListener listener : recordListeners) {
            count++;
            try {
                // 如果是新增
                if (action == DataOperation.CHANGE) {
                    Datum datum = dataStore.get(datumKey);
                    if (datum != null) {
                        // 更新 serivce 的实例数据
                        listener.onChange(datumKey, datum.value);
                    } else {
                        Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);
                    }
                    continue;
                }
                // 如果是删除
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }

        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                       datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

根据action 为 DataOperation.CHANGE,代码中执行的代码分支为listener.onChange(datumKey, datum.value),该方法的逻辑为修改服务的实例数据,源码如下

public void onChange(String key, Instances value) throws Exception {

    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

    for (Instance instance : value.getInstanceList()) {

        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }

        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }

        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    // 更新 service 的 实例集合
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

    recalculateChecksum();
}

以上代码先遍历所有的实例数据设置权值,再通过updateIPs方法更新服务实例,updateIPs方法的代码如下:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    // 根据 clusterMap 创建 ipMap对象
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    // 根据 clusterMap 初始化 ipMap对象
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
	// 遍历最新的实例集合数据
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            // 如果集群名称为null ,则设置默认的集群名称 DEFAULT
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            // 如果当前 service 的clusterMap不包括 实例的 集群名称,则需要创建新的集群对象
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                          instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }

            // 如果当前 ipMap 不包括 当前实例的 集群名称,则需要创建新的集群对象
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
			// 给当前的 集群对象赋值 实例数据。
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }
	// 遍历 ipMap对象,给 clusterMap 替换最新的 entryIPs
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // 给 clusterMap 替换最新的 entryIPs
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }

    setLastModifiedMillis(System.currentTimeMillis());
    // 发布
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();

    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }

    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                         stringBuilder.toString());

}

以上代码先根据当前服务下的集群信息构造构造ipMap对象,然后遍历最新的实例集合数据更新ipMap对象,最后循环调用clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral)方法来更新当前集群中的实例列表数据。updateIps方法代码如下:

public void updateIps(List<Instance> ips, boolean ephemeral) {
    // 获取 本集群中的 实例集合
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    // 根据old的实例数据 构建 hashmap
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    // 根据实例的 key 添加到 oldIpMap中
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    // 获取更新的 实例数据 List
    List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
    if (updatedIPs.size() > 0) {
        for (Instance ip : updatedIPs) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());

            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                ip.setHealthy(oldIP.isHealthy());
            }
            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                                     (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }

            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                                     ip.toString());
            }
        }
    }
    // 获取新增的 实例数据
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                  getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
            HealthCheckStatus.reset(ip);
        }
    }
    // 获取删除的 实例数据
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                  getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
            HealthCheckStatus.remv(ip);
        }
    }
    // 根据传进来的 实例集合 创建需要更新的实例set 
    toUpdateInstances = new HashSet<>(ips);

    // 直接替换
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

以上代码就是更新cluster对象下的实例数据逻辑,根据代码可知在cluster对象中更新实例数据就是拿传进来的实例列表创建set集合直接替换的。

二、服务实例列表拉取

客户端程序启动之后,会执行com.alibaba.cloud.nacos.discovery.NacosWatch类的start()方法,此方法中会执行以下语句,

namingService.subscribe(properties.getService(), properties.getGroup(),
                        Arrays.asList(properties.getClusterName()), eventListener);

此方法用来获取当前服务的实例数据,subscribe方法代码如下,

public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
    throws NacosException {
    // 获取服务列表数据
    hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
                          listener);
}

通过hostReactor.subscribe方法获取服务列表数据,subscribe方法的代码如下,

public void subscribe(String serviceName, String clusters, EventListener eventListener) {
    notifier.registerListener(serviceName, clusters, eventListener);
    // 获取服务列表数据
    getServiceInfo(serviceName, clusters);
}

通过getServiceInfo方法获取服务列表数据,getServiceInfo的代码如下:

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
    return failoverReactor.getService(key);
}
// 根据服务名称和集群名称获取本地的服务列表数据
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
    serviceObj = new ServiceInfo(serviceName, clusters);
    serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    updatingMap.put(serviceName, new Object());
    // 如果本地服务实例数据为null,则去获取最新的服务实例列表
    updateServiceNow(serviceName, clusters);
    updatingMap.remove(serviceName);

} else if (updatingMap.containsKey(serviceName)) {
    if (UPDATE_HOLD_INTERVAL > 0) {
        // hold a moment waiting for update finish
        synchronized (serviceObj) {
            try {
                serviceObj.wait(UPDATE_HOLD_INTERVAL);
            } catch (InterruptedException e) {
                NAMING_LOGGER
                    .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
            }
        }
    }
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());

以上代码可知,会根据服务名称和clusters名称获取本地缓存serviceInfoMap对象中的服务列表数据。如果本地服务实例数据为null,则通过updateServiceNow方法去nacos服务端获取最新的服务实例列表。updateServiceNow方法代码如下:

try {
    // 更新本地服务方法
    updateService(serviceName, clusters);
} catch (NacosException e) {
    NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}

updateService的代码如下:

public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
		// 调用服务代理类获取服务实例列表,pushReceiver.getUdpPort()会随机生成一个udp端口
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        if (StringUtils.isNotEmpty(result)) {
            // 如果 result不为空,则向本地缓存 serviceInfoMap 添加服务实例列表
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

通过调用服务代理类serverProxy的queryList方法获取服务实例列表,pushReceiver.getUdpPort()会获pushReceiver的udp端口,pushReceiver对象是一个udp数据接收类,用来接收nacos服务器发送的udp数据,比如服务实例更新的消息。serverProxy.query方法的代码如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException {
	// 构造请求参数
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    // 客户端的upd端口,服务端回调客户端udp端口会用到
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
	// 向nacos服务器获取服务列表数据,并返回
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

在构造的请求参数中包括了客户端的udpPort,该参数在服务端回调接口会用到。reqApi方法其实就向nacos服务器的/nacos/v1/ns/instance/list接口发送了请求消息,该接口对应的nacos服务端的源码的naming工程中InstanceController的list方法,代码如下,

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {

    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    // 获取实例列表数据
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                     healthyOnly);
}

以上代码先构造相关参数信息,然后通过doSrvIpxt方法来获取实例列表数据,doSrvIpxt代码如下:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 根据命名空间id和服务名称获取服务
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();

    // now try to enable the push
    try {
        // 如果端口大于0 ,并且是支持的客户端
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
            // 添加 PushClient 对象
            pushService
                .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        Loggers.SRV_LOG
            .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }
    // 如果服务对象为 null ,则构造数据返回
    if (service == null) {
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.put("cacheMillis", cacheMillis);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }
    // 检查服务是否可用
    checkIfDisabled(service);

    List<Instance> srvedIPs;
    // 根据集群列表获取具体服务下面的实例列表
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

    // filter ips using selector:
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    // 如果实例数据为空,则构造数据返回
    if (CollectionUtils.isEmpty(srvedIPs)) {

        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }

        if (clientInfo.type == ClientInfo.ClientType.JAVA
            && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }

        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.set("hosts", JacksonUtils.createEmptyArrayNode());
        result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }

    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    // 构造健康和不健康的实例数据
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }

    if (isCheck) {
        result.put("reachProtectThreshold", false);
    }

    double threshold = service.getProtectThreshold();

    if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

        Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
        if (isCheck) {
            result.put("reachProtectThreshold", true);
        }

        ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
        ipMap.get(Boolean.FALSE).clear();
    }

    if (isCheck) {
        result.put("protectThreshold", service.getProtectThreshold());
        result.put("reachLocalSiteCallThreshold", false);

        return JacksonUtils.createEmptyJsonNode();
    }

    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    // 构造返回的实例列表对象
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();

        if (healthyOnly && !entry.getKey()) {
            continue;
        }

        for (Instance instance : ips) {

            // remove disabled instance:
            if (!instance.isEnabled()) {
                continue;
            }

            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            // deprecated since nacos 1.0.0:
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }

            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);

        }
    }

    result.replace("hosts", hosts);
    if (clientInfo.type == ClientInfo.ClientType.JAVA
        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
        result.put("dom", serviceName);
    } else {
        result.put("dom", NamingUtils.getServiceName(serviceName));
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}

以上代码其实就是根据命名空间id和服务名称获取服务对象,然后根据不同情况构造返回对象,正常情况会构造一个ServiceInfo类型的ObjectNode对象,整个具体过程请看上面的代码注释。最后返回构造的对象。

客户端中拿到请求/nacos/v1/ns/instance/list接口的返回值之后会转成一个ServiceInfo对象,并且把该对象赋值给本地的缓存对象serviceInfoMap,processServiceJson关键代码如下:

// 将返回值转换成 ServiceInfo 类型的对象
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
// 把该对象添加到本地缓存中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

三、定时心跳任务

在客户端向nacos服务端注册服务的过程中,会调用com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)方法,在该代码中有个判断逻辑,如果是临时实例则会创建一个BeatInfo对象添加到beatReactor中。代码如下:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查 实例是否合法
    // heart beat timeout must(默认15秒) > heart beat interval (默认5秒)
    // ip delete timeout must(默认30 秒)  > heart beat interval
    NamingUtils.checkInstanceIsLegal(instance);
    // 构建 groupName@@serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果是临时实例,则创建心跳信息,定时给nacos服务发送
    if (instance.isEphemeral()) {
        // 构造心跳信息
        BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
        // 执行心跳定时任务
        this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
	// 向 nacos-service 注册实例
    this.serverProxy.registerService(groupedServiceName, groupName, instance);
}

beatInfo对象用来存储心跳信息,buildBeatInfo方法代码如下,

public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
    BeatInfo beatInfo = new BeatInfo();
    beatInfo.setServiceName(groupedServiceName);
    beatInfo.setIp(instance.getIp());
    beatInfo.setPort(instance.getPort());
    beatInfo.setCluster(instance.getClusterName());
    beatInfo.setWeight(instance.getWeight());
    beatInfo.setMetadata(instance.getMetadata());
    beatInfo.setScheduled(false);
    beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    return beatInfo;
}

beatReactor中有一个ScheduledExecutorService类型的executorService实例用来执行定时的线程,addBeatInfo的代码如下,

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    // 线程池添加定时任务,默认 5 秒钟之后 执行 BeatTask
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

根据上面的executorService.schedule()代码可知,BeatTask线程在固定的秒数之后执行,而BeatTask实现了Runnable接口,即执行BeatTask的run方法 。BeatTask的run方法代码如下,

public void run() {
    // 如果 beatInfo 设置了 stop ,则停止
    if (beatInfo.isStopped()) {
        return;
    }
    // 获取下一次延期执行的时间
    long nextTime = beatInfo.getPeriod();
    try {
        // 向服务端发送心跳信息
        JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
        long interval = result.get("clientBeatInterval").asLong();
        boolean lightBeatEnabled = false;
        if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
            lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
        }
        BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
        if (interval > 0) {
            nextTime = interval;
        }
        int code = NamingResponseCode.OK;
        if (result.has(CommonParams.CODE)) {
            code = result.get(CommonParams.CODE).asInt();
        }
        if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
            Instance instance = new Instance();
            instance.setPort(beatInfo.getPort());
            instance.setIp(beatInfo.getIp());
            instance.setWeight(beatInfo.getWeight());
            instance.setMetadata(beatInfo.getMetadata());
            instance.setClusterName(beatInfo.getCluster());
            instance.setServiceName(beatInfo.getServiceName());
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(true);
            try {
                serverProxy.registerService(beatInfo.getServiceName(),
                                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
            } catch (Exception ignore) {
            }
        }
    } catch (NacosException ex) {
        NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                            JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

    }
    // 重新提交定时任务,延期发送心跳信息
    executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}

以上代码先获取下一次延期执行的时间,再通过serverProxy.sendBeat()向服务端发送心跳信息,最后重新提交定时任务,延期发送心跳信息,serverProxy.sendBeat()代码如下,

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    // 向nacos服务器发送心跳数据,并返回
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

reqApi方法其实就向nacos服务器端的/nacos/v1/ns/instance/beat接口发送了put类型的请求消息,该接口对应的nacos服务端的源码的naming工程中InstanceController的beat方法,beat方法的代码如下,

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    // 获取心跳信息
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    // 如果 beat 数据不为空,则构造 RsInfo 类型的  clientBeat 实例
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    // 获取集群名称
    String clusterName = WebUtils
        .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    // 获取 实例的 ip
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    // 获取 实例的 端口
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    // 如果 clientBeat 不为空,则设置 相关的信息
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    // 获取 namespaceId
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 获取 serviceName
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    // 检查 ServiceName 的格式
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    // 根据 参数 获取 具体的实例
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果 实例为 空
    if (instance == null) {
        // 如果 clientBeat 为空 则构造参数 code 为 20404的结果返回
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                             + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        // 如果 clientBeat 不为空 则构造 instance 数据,向 serviceManager 注册实例。
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    // 根据服务名称获取 服务
    Service service = serviceManager.getService(namespaceId, serviceName);
    // 如果服务为空 ,则抛异常
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                                 "service not found: " + serviceName + "@" + namespaceId);
    }
    // 如果 clientBeat 为空,则创建该对象
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    // 处理客户端的 心跳对象
    service.processClientBeat(clientBeat);
    //
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

先获取心跳信息,然后构造RsInfo类型的clientBeat实例。然后通过service.processClientBeat(clientBeat)方法处理客户端的心跳对象,processClientBeat方法的代码如下,

public void processClientBeat(final RsInfo rsInfo) {
    // 构造 ClientBeatProcessor 对象
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    // 定时执行 ClientBeatProcessor 对象,这里是立即执行,延期时间为 0
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

ClientBeatProcessor是一个实现了Runnable的类,HealthCheckReactor是一个定时任务线程池,scheduleNow方法表示立即执行clientBeatProcessor对象的run方法,clientBeatProcessor.run方法代码如下,

public void run() {
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) {
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    }
    // 获取ip
    String ip = rsInfo.getIp();
    // 获取 集群名称
    String clusterName = rsInfo.getCluster();
    // 获取端口
    int port = rsInfo.getPort();
    // 从服务对象中获取集群对象
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 从集群对象中获取所有的临时实例列表
    List<Instance> instances = cluster.allIPs(true);

    for (Instance instance : instances) {
        // 找到 ip 和端口相同的 实例数据
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            // 更新 最后心跳时间
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked() && !instance.isHealthy()) {
                instance.setHealthy(true);
                Loggers.EVT_LOG
                    .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                          cluster.getService().getName(), ip, port, cluster.getName(),
                          UtilsAndCommons.LOCALHOST_SITE);
                getPushService().serviceChanged(service);
            }
        }
    }
}

以上代码可知,该方法主要用来更新客户端实例的最后心跳时间。

三、服务端接口

一、定时检查服务实例任务

在客户端注册服务的时候,会调用nacos服务端的com.alibaba.nacos.naming.controllers.InstanceController#register方法,其中会调用createEmptyService方法用来创建空的服务对象,最后会调用service.init()方法用来初始化服务对象,init方法代码如下

public void init() {
    // 定时执行 service 的 run 方法 处理超时的 instance
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

通过调用HealthCheckReactor.scheduleCheck()方法来定时执行clientBeatCheckTask,scheduleCheck的代码如下,

public static void scheduleCheck(ClientBeatCheckTask task) {
    // 5秒之后执行 task,并且每次执行task完之后,5秒之后再次执行 task
    futureMap.computeIfAbsent(task.taskKey(),
                              k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

以上代码给定时任务线程池GlobalExecutor提交了一个task任务,其中task是一个实现了Runable接口的类,线程池每次执行的就是ClientBeatCheckTask 的run方法,run方法代码如下,

public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        // 获取该服务下面的所有 注册实例集合
        List<Instance> instances = service.allIPs(true);
        // first set health status of instances:
        for (Instance instance : instances) {
            // 如果 当前时间 减去 实例的最新心跳时间 如果大于 实例配置的心跳超时时间(默认15秒)
            // 并且 实例的健康状态 true
            // 则设置服务的健康状态为 false
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                            .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                  instance.getIp(), instance.getPort(), instance.getClusterName(),
                                  service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                  instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        // then remove obsolete instances:
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }
            // 如果 当前时间 减去 实例的最新心跳时间 如果大于 实例配置的删除超时时间(默认30秒)
            // 则会调用 deleteIp 删除方法删除实例
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                     JacksonUtils.toJson(instance));
                // 删除实例
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}

以上代码就两个逻辑,一个逻辑是判断当前时间减去实例的最新心跳时间是否大于实例配置的心跳超时时间(默认15秒),如果大于则设置实例的健康状态为false;第二个逻辑是 判断当前时间减去实例的最新心跳时间 是否大于实例配置的删除超时时间(默认30秒),如果大于则调用deleteIp(instance);删除该实例,deleteIp的代码如下,

NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
    .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
    .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
// 构造url地址
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
    + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();

// delete instance asynchronously:
// 向本地服务器地址发送删除请求
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
    @Override
    public void onReceive(RestResult<String> result) {
        if (!result.ok()) {
            Loggers.SRV_LOG
                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                       instance.toJson(), result.getMessage(), result.getCode());
        }
    }
    @Override
    public void onError(Throwable throwable) {
        Loggers.SRV_LOG
            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: ", instance.toJson(),
                   throwable);
    }
    @Override
    public void onCancel() {

    }
});

HttpClient.asyncHttpDelete方法其实就是向 ${spring.cloud.nacos.discovery.server-addr}/nacos/v1/ns/instance 发送Delete请求。该请求地址对应的nacos服务端的源码的naming工程中InstanceController的deregister方法,deregister代码如下,

@CanDistro
@DeleteMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
    // 从请求参数中构造实例对象
    Instance instance = getIpAddress(request);
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    Service service = serviceManager.getService(namespaceId, serviceName);
    if (service == null) {
        Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
        return "ok";
    }
	// 删除实例数据
    serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    return "ok";
}

removeInstance方法是关键,用来删除实例数据,removeInstance代码如下,

public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {
    // 先获取服务对象
    Service service = getService(namespaceId, serviceName);
    // 服务对象加锁
    synchronized (service) {
        // 调用删除实例对象的方法
        removeInstance(namespaceId, serviceName, ephemeral, service, ips);
    }
}

removeInstance方法的代码如下,

private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,
                            Instance... ips) throws NacosException {
    // 构造 key
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 获取服务下的实例集合(服务已有 减去 需要删除实例)
    List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);

    Instances instances = new Instances();
    instances.setInstanceList(instanceList);
    // 根据KEY更新服务的实例
    consistencyService.put(key, instances);
}

substractIpAddresses方法用来获取该服务下已经减去需要删除实例的实例数据,其中调用的updateIpAddresses方法,action值为 remove。removeInstance方法的整体逻辑为通过updateIpAddresses方法拿到该服务中去掉删除实例之后的实例集合对象,并把该实例集合对象添加到consistencyService对象中,consistencyService.put(key, instances)里面的逻辑和客户端注册服务一样的逻辑。updateIpAddresses方法和consistencyService.put方法已经在客户端服务注册章节已经讲了,这里不再讲解。

二、服务实例更新推送

在客户端更新服务实例的过程中nacos服务端会调用com.alibaba.nacos.naming.core.Service#updateIPs()方法(客户端注册服务的过程请看客户端服务注册章节),在该方法中会调用getPushService().serviceChanged(this)来发布当前服务的修改事件,即会发布一个事件用来通知已经和nacos服务端通信过的客户端更新客户端本地的服务信息。serviceChanged 的代码如下,

public void serviceChanged(Service service) {
    // merge some change events to reduce the push frequency:
    if (futureMap
        .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
        return;
    }
    // 发布服务修改事件
    this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

applicationContext.publishEvent会触发一个ServiceChangeEvent事件,其实就是触发com.alibaba.nacos.naming.push.PushService#onApplicationEvent方法,其中逻辑为先根据命名空间id和服务名称获取所有的客户端map对象,然后遍历所有客户端对象PushClient 构造 ackEntry 对象,最后向具体的客户端发送 upd 消息。获取关键代码如下,

if (compressData != null) {
    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
    // 构造 ackEntry 对象
    ackEntry = prepareAckEntry(client, prepareHostsQData(client), lastRefTime);
    // 添加缓存
    if (ackEntry != null) {
        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
    }
}

Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                  client.getServiceName(), client.getAddrStr(), client.getAgent(),
                  (ackEntry == null ? null : ackEntry.key));
// 向具体的客户端发送 upd 消息
udpPush(ackEntry);

以上代码prepareHostsQData的逻辑就是获取该服务下客户端所属服务的所有实例数据,并且构造具体的Map<String,Object>对象,prepareHostsQData代码如下,

private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
    Map<String, Object> cmd = new HashMap<String, Object>(2);
    cmd.put("type", "dom");
    // 获取客户端所属服务的所有实例数据
    cmd.put("data", client.getDataSource().getData(client));
    return cmd;
}

udpPush(ackEntry)里面封装了发送udp消息的关键代码,ackEntry封装了udp的数据信息。

在客户端获取服务实例列表的时候,会生成一个PushReceiver对象,该对象用来监听和接收nacos服务端发送的udp数据。该对象实现了Runnable接口,并且在构造方法中把自己提交给了一个内部属性的线程池对象。构造方法如下,

public PushReceiver(HostReactor hostReactor) {
    try {
        this.hostReactor = hostReactor;
        this.udpSocket = new DatagramSocket();
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });

        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

根据以上代码可知,所以在创建PushReceiver对象之后会执行run方法,run方法的代码如下,

public void run() {
    while (!closed) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            // 构造 DatagramPacket 对象
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
			// 监听upd数据
            udpSocket.receive(packet);
			// 构造服务对象的string 数据
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                // 更新本地缓存 serviceInfoMap 的服务对象
                hostReactor.processServiceJson(pushPacket.data);
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                    + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                    + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                    + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            }

            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                                              packet.getSocketAddress()));
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

以上代码用while一直监听udpSocket的客户端upd的端口。当接收到从nacos服务端发送过来的udp数据之后会接着调用 hostReactor.processServiceJson()方法来更新客户端本地的serviceInfoMap 的服务对象。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK