6

Nacos - 服务端处理心跳请求

 3 years ago
source link: https://segmentfault.com/a/1190000038834691
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

服务端用InstanceController#beat方法接收心跳请求。

InstanceController#beat

这里会判断是否已经有实例,如果没有就创建实例,然后再开始检查心跳。

public ObjectNode beat(HttpServletRequest request) throws Exception {
        
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 设置心跳时间,会直接改客户端的心跳时间
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    // 其他略
    // 通过namespaceId, serviceName, clusterName, ip, port获取Instance
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果没有,则注册
    if (instance == null) {
        // 这个是通过beat判断的,如果是第一次,则beat有信息,就会创建clientBeat
        // 如果不是第一次,正常instance不为空的,所以此时为空说明可能被移除了
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        // 其他略
        // 注册
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    // 从serviceMap缓存获取Service
    Service service = serviceManager.getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    // 不是第一次,组装clientBeat
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    // 处理心跳
    service.processClientBeat(clientBeat);
    
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

ServiceManager#getInstance

通过ip和端口获取实例

public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port) {
    // 从serviceMap缓存获取Service
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        return null;
    }
    
    List<String> clusters = new ArrayList<>();
    clusters.add(cluster);
    // 从clusters集群获取Instance集合
    List<Instance> ips = service.allIPs(clusters);
    if (ips == null || ips.isEmpty()) {
        return null;
    }
    // 通过ip和端口获取实例
    for (Instance instance : ips) {
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            return instance;
        }
    }
    
    return null;
}

Service#processClientBeat

封装Runnable对象,放入线程池。

public void processClientBeat(final RsInfo rsInfo) {
    // 创建ClientBeatProcessor对象,这个是Runnable,所以线程池会调用他的run方法
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

ClientBeatProcessor#run

找到对应的Instance,设置最后心跳时间,并设置为健康的,最后广播消息。

public void run() {
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) {
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    }
    
    String ip = rsInfo.getIp();
    String clusterName = rsInfo.getCluster();
    int port = rsInfo.getPort();
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 获取所有Instance
    List<Instance> instances = cluster.allIPs(true);
    
    for (Instance instance : instances) {
        //  通过ip和端口获取Instance
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            // 设置最后心跳时间
            instance.setLastBeat(System.currentTimeMillis());
            // 没有被标记且不不健康的,设置为健康
            if (!instance.isMarked()) {
                if (!instance.isHealthy()) {
                    instance.setHealthy(true);
                    Loggers.EVT_LOG
                            .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                    cluster.getService().getName(), ip, port, cluster.getName(),
                                    UtilsAndCommons.LOCALHOST_SITE);
                    // 广播消息
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}

PushService#onApplicationEvent

广播消息后,监听ServiceChangeEvent类型的类会调用onApplicationEvent方法。这里主要是封装UDP数据并发送。

public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();

    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }

            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            // 遍历PushClient集合
            for (PushClient client : clients.values()) {
                // 过期了就算了
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }

                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;
                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();

                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                // 封装UDP数据,如果数据大于1kb则压缩,compressIfNecessary这个方法判断
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }

                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));
                // 发送udp数据
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

        } finally {
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }

    }, 1000, TimeUnit.MILLISECONDS);

    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

PushService#udpPush

发送UDP数据,会重试10次。每10秒检查一次。

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }
    // 重试最大次数还没成功,就删除ackMap和udpSendTimeMap的内容
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }

    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        // udp发送
        udpSocket.send(ackEntry.origin);

        ackEntry.increaseRetryTime();
        // 10秒检查一次
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;

        return null;
    }
}

Retransmitter#run

每10秒检查是否发送成功,如果没发送成功,就继续发送,最多10次。

public void run() {
    if (ackMap.containsKey(ackEntry.key)) {
        Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
        udpPush(ackEntry);
    }
}

Receiver#run

PushService创建的时候,会开启Receiver的线程。

static {
    // 其他略
    Receiver receiver = new Receiver();
    Thread inThread = new Thread(receiver);
    inThread.setDaemon(true);
    inThread.setName("com.alibaba.nacos.naming.push.receiver");
    inThread.start();
    // 其他略
}

他这里会有个while(true),收到请求后移除ackMap对应的key。

public void run() {
    while (true) {
        // 其他略
        String ackKey = getAckKey(ip, port, ackPacket.lastRefTime);
        AckEntry ackEntry = ackMap.remove(ackKey);
        // 其他略
    }
}

广播的时候,会往ackMap存入值,广播过程失败就从ackMap移除对应的值。有时候UDP请求不成功,那这个值一直会在ackMap,这个时候,Retransmitter每隔10秒就会去ackMap看看有没有成功,如果没有成功,他就会去重试,直至到达重试最大次数。另外还有一个线程,会去监听UDP响应,如果收到了响应,就会从ackMap移除对应的值。这个UDP是发送给客户端的,Nacos - HostReactor的创建提到了收到请求后的处理,让客户端自己去更新信息。
image

主要是收到心跳请求后,更新心跳的时间、健康状态以及广播
image


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK