10

Nacos - 服务端处理实例列表请求

 3 years ago
source link: https://segmentfault.com/a/1190000038835528
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

服务端处理实例列表请求的入口是InstanceController#list。

InstanceController#list

public ObjectNode list(HttpServletRequest request) throws Exception {
        
    // 其他略
    // 获取服务列表
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}

InstanceController#doSrvIpxt

这里主要是推送UDP请求,加入Client,Nacos - 服务端处理心跳请求的clients就是这里加入的。
另外就是判断保护阈值,如果比例低于阈值,则把健康和不健康的都返回,如果高于阈值,就只返回健康的实例。

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 从serviceMap缓存获取Service
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();
    
    // now try to enable the push
    try {
        // 端口大于0,且客户端agent信息符合则发表
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
            // 心跳那个章节的client就是这里加的
            pushService
                    .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                            pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        Loggers.SRV_LOG
                .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }
    // 为空封装返回
    if (service == null) {
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.put("cacheMillis", cacheMillis);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }
    // 检查service状态
    checkIfDisabled(service);
    
    List<Instance> srvedIPs;
    // 获取相关实例的ip
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    // filter ips using selector:
    // 对ip过滤
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    // id为空封装返回
    if (CollectionUtils.isEmpty(srvedIPs)) {
        
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.set("hosts", JacksonUtils.createEmptyArrayNode());
        result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
    // 主要是把健康和不健康的区分出来
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());

    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    if (isCheck) {
        result.put("reachProtectThreshold", false);
    }
    
    double threshold = service.getProtectThreshold();
    // 阈值保护,健康实例和吧健康实例的比例小于阈值,则把健康和不健康的都返回
    if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
        
        Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
        if (isCheck) {
            result.put("reachProtectThreshold", true);
        }
        
        ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
        ipMap.get(Boolean.FALSE).clear();
    }
    
    // 其他略,就是处理ipMap的信息返回值
    return result;
}

流程图如下:
image


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK