8

Nacos服务发现原理分析 - 码出code

 1 year ago
source link: https://www.cnblogs.com/jeremylai7/p/17137747.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

微服务将自己的实例注册到nacos注册中心,nacos服务端存储了注册列表,然后通过ribbon调用服务,具体是如何调用?如果nacos服务挂了,还能正常调用服务吗?调用的服务列表发生变化,调用方是如何感知变化的?带着这些问题,来探索一下服务发现的原理。

版本 2.1.1

  • Nacos Server:2.1.1
  • spring-cloud-starter-alibaba:2.1.1.RELEASE
  • spring-boot:2.1.1.RELEASE
  • spring-cloud-starter-netflix-ribbon:2.1.1.RELEASE

客户端和服务端版本号都为2.1.1

从 Ribbon 讲起

使用ribbon来调用服务,就添加ribbon依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>

ribbon依赖包含spring-cloud-commons依赖,而在spring-cloud-commons包中spring.factories自动配置LoadBalancerAutoConfiguration类:

@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Bean
public LoadBalancerInterceptor ribbonInterceptor(
    LoadBalancerClient loadBalancerClient,
    LoadBalancerRequestFactory requestFactory) {
  return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}

只要标注了@LoadBalanced注解的restTemplates都会添加负载均衡拦截器LoadBalancerInterceptor

使用Ribbon组件调用服务:

restTemplate.getForObject("http://service-name",String.class);

restTemplatehttp请求方法,最终会调用到doExecute方法。doExecute在发起http请求之前,会先执行LoadBalancerInterceptor负载均衡拦截器的intercept方法。 该方法调用execute方法。

而在execute方法中,主要有两个方法:

ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);

execute先通过getLoadBalancer获取ILoadBalancer实例,然后再通过getServer获取Server实例。

getLoadBalancer最终会调用RibbonServerList接口,具体调用流程:

getLoadBalancer() ->
ZoneAwareLoadBalancer -> 
DynamicServerListLoadBalancer -> 
restOfInit()->
updateListOfServers()->
ServerList.getUpdatedListOfServers()->

Nacos实现类NacosServerList实现了ServerList接口。

总之我们在进行微服务调用的时候,Ribbon最终会调用NacosServerList类中的getUpdatedListOfServers方法。

Nacos 获取服务

NacosServerList类的getUpdatedListOfServers方法调用了该类的getServers方法:

private List<NacosServer> getServers() {
  try {
    // 获取分组 
    String group = discoveryProperties.getGroup();
    // 重点,查询实例列表
    List<Instance> instances = discoveryProperties.namingServiceInstance()
        .selectInstances(serviceId, group, true);
    return instancesToServerList(instances);
  }
  catch (Exception e) {
    throw new IllegalStateException(
        "Can not get service instances from nacos, serviceId=" + serviceId,
        e);
  }
}

重点看NacosNamingService类的selectInstances方法,会调用以下selectInstances三个重载方法:

@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
    return selectInstances(serviceName, groupName, healthy, true);
}
    
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
    return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
    
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    // 默认订阅
    if (subscribe) {
        // 获取服务,这是重点
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}

最后一个selectInstances方法里面的hostReactor.getServiceInfo方法是获取服务的核心方法:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 先在本地缓存查询
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    // 查询不到 
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        updatingMap.put(serviceName, new Object());
        // 请求Nacos Server实例,并更新服务实例
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    // 定时更新本地缓存
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}

getServiceInfo是服务发现的核心方法,先查询serviceInfoMap集合中查询本地缓存,本地缓存查询不到就请求Nacos Server实例,并更新本地缓存。

请求Nacos Server实例,实际就是发送http请求Nacos Server

public void updateServiceNow(String serviceName, String clusters) {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        // 调用 Nacos Server 查询服务
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
        // 结果不为空,更新缓存  
        if (StringUtils.isNotEmpty(result)) {
            processServiceJSON(result);
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

//向 Nacos Server发起 HTTP 列表查询
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {

    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));

    return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}

queryList方法主要封装号请求参数,然后向Nacos Server服务端发送http请求。

当服务端实例发生改变时,Nacos Server会推送最新的实例给服务端。

服务发现是先获取本地缓存,如果没有本地缓存,就请求Nacos Server服务端获取数据,如果Nacos Server挂了,也不会影响服务的调用。

  • Ribbon
    • 项目启动时,会创建一个负载均衡拦截器。
    • Ribbon发起服务请求开始,最终会调用到拦截器的拦截方法。
    • 拦截方法又调用ServerList获取实例接口,而NacosServerList实现获取实例列表。
  • Nacos调用服务
    • NacosServerList实现了获取服务实例列表。
    • NacosServerListselectInstances方法最终调用了hostReactor.getServiceInfo方法
    • getServiceInfo方法先从serviceInfoMap集合中获取本地缓存,如果本地缓存找不到,就请求Nacos Server获取服务实例,并更新本地缓存。
    • 获取服务之后,定时更新本地缓存。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK