4

微服务组件-----Spring Cloud Alibaba 注册中心Nacos的CP架构Raft协议分析 - 忧愁的ch...

 1 year ago
source link: https://www.cnblogs.com/chafry/p/16974486.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

微服务组件-----Spring Cloud Alibaba 注册中心Nacos的CP架构Raft协议分析

  本篇幅是继  注册中心Nacos源码分析 的下半部分。

【1】虽说大部分我们采用注册中心的时候考虑的都是AP架构,为什么呢?因为性能相对于CP架构来说更高,需要等待的时间更少【相对于CP架构,采用的是二段提交,AP架构是直接落盘数据,然后进行数据扩散,来达到最终一致,所以客户端收到响应会更快】;

【2】其次,考虑AP架构会不会存在数据丢失的风险呢?答案是必然的,所以是不是应该考虑CP架构呢?那么问题来了,数据丢失是问题吗?明显不是。基于AP架构的注册中心,明显在客户端那边都会存在重试机制,也就是对于一个集群而言,一台服务器宕机会自动重连到其他机器上去,所以有补充的手段自然也就不考虑CP架构了。

【3】但是对于分布式而言,CP架构确实十分重要的一部分;如zookeeper便是分布式的鼻祖,不过采用的是ZAB强一致性协议,而raft则也是强一致性协议的【两者看似差不多,但也有一些差别】,而且市面上喜欢raft协议的更多。所以研究Raft协议便是比较必要的。

 Raft协议详解

【1】演示网站:http://thesecretlivesofdata.com/raft/

【2】Raft定义角色的三种状态

1)Follower state    //追随者
2)Candidate state  //候选者
3)Leader state        //领导者

【3】流程分析

【3.1】所有的节点都以跟随者状态开始的

【3.2】如果追随者没有收到任何领导人的消息,那么他们就可以成为一名候选者【注意:这里的候选者是一个至于这个候选者是怎么来的还要分析,会不会出现多个的情况,但是能选举成功必然是只有一个候选者】,这也是与ZAB协议的不同之处,ZAB是所有人参与

2168218-20221212194832337-696164568.png

【3.3】候选人向其他节点发送投票请求,而节点对投票请求进行回复

【3.4】候选人从大多数节点获得选票,就将成为领导者

2168218-20221212195620915-1366563229.png

【4】数据一致性的流程分析

【4.1】对系统的所有变化现在都要经过领导【哪怕是你请求从节点,从节点也会转发给主节点】,主节点每个更改都将作为节点日志中的一个条目添加,但此日志条目当前未提交,因此不会更新节点的值【注意:此时客户端只是发送了请求还没收到回复

2168218-20221212195911289-968095670.png

【4.2】要提交条目,节点首先将其复制到跟踪节点【主节点向从节点发送请求】【一般是通过心跳请求,附带上数据来完成的

2168218-20221212200641012-2854070.png

【4.3】领导会等待,直到大多数节点已经写入了条目并回复请求,该条目才会在领导节点上提交【这个时候便会回复客户端的请求

2168218-20221212200759024-869262391.png

【4.4】领导再次发送请求通知追随者条目已提交【还是通过心跳进行二次通知

2168218-20221212201023589-2097667316.png

【4.5】因为标准的Raft中采用的是QUORUM, 即确保至少大部分节点都接收到写操作之后才会返回结果给Client, 而Redis默认采用的实际上是ANY/ONE, 即只要Master节点写入成功后,就立刻返回给Client,然后该写入命令被异步的发送给所有的slave节点,来尽可能地让所有地slave节点与master节点保持一致。

写入类型 解释说明
ZERO 接收到写操作后立即返回,不保证写入成功
ANY/ONE 确保该值被写入至少一个节点
QUORUM  确保至少大多数节点(节点总数/2+1)接受到写操作之后,再返回
ALL 确保所有副本节点都接受到写操作之后,再返回

【5】多节点下出现脑裂问题【Raft如何保持一致】

【5.1】假设有五个分区

2168218-20221213004225878-342767740.png

【5.2】模拟网络故障导致分区,AB为一组,CDE为一组。【由于分区后,CDE没有了领导者,收不到心跳之后,便会达到选举超时的时间,由于C优先超时便会成为候选者】

2168218-20221213004443226-2124771174.png

【5.3】此时便会出现B和C都成为领导者的情况

2168218-20221213005434733-691010818.png

【5.4】如果出现数据更新的情况

【5.4.1】人数少的分区收到写入请求是写入不成功的【因为基于大多数写入成功才算是写入成功,所以最终客户端会是写入请求超时】

2168218-20221213010021826-1539686563.png

【5.4.2】相反数目多的分区【在C写入后,通过心跳传递数据给DE,然后收到回复后明显是符合写入节点数大于或等于(节点总数/2+1)】,所以就会返回客户端响应,并在下一次心跳中让DE进行提交。

2168218-20221213010612781-1453291441.png

【5.5】当分区网络恢复后【其实是根据选举周期判断哪个会保留为领导者,其余将回滚未提交的条目,并匹配新领导者的日志】

2168218-20221213011228143-1088349322.png

【5.6】当然这里面还会出现偶数分区【如ABCD,分为AB,CD,那么其实算是两个分区都不可用,毕竟都不满足(4/2+1)为3的写入】

 结合源码分析写入数据部分的验证

【1】基于上个篇幅的【1.1.2】分析addInstance注册方法,里面会根据ephemeral字段属性判断是持久节点还是临时节点

【1.1】RaftConsistencyServiceImpl类#put方法

@Override
public void put(String key, Record value) throws NacosException {
    checkIsStopWork();
    try {
        //利用自己实现的raftCore类
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(...);
    }
}

【1.2】raftCore类#signalPublish方法是怎么发送和处理数据的【重点

/**
 * Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.
 *
 * @param key   key
 * @param value value
 * @throws Exception any exception during publish
 */
public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    //验证了只有领导者才有权利进行写入
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();
        //非领导者将利用请求转发给领导者,待领导者回复后回复
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
        //进行数据的发布【1.3】
        onPublish(datum, peers.local());
        
        //接下来这一片,便是数据一致性的实现,大体为
        //先将数据转成json,然后是发起异步请求【不阻塞,会有回调处理】
        //利用CountDownLatch阻塞: peers.size() / 2 + 1,来达成raft的大部分节点响应部分。
        //满足就回复客户端,不满足就阻塞至请求超时
        final String content = json.toString();
        
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildUrl(server, API_ON_PUB);
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.RAFT
                                .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, result.getCode());
                        return;
                    }
                    latch.countDown();
                }
                
                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }
                
                @Override
                public void onCancel() {
                
                }
            });
            
        }
        
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}

【1.3】分析数据发布的onPublish方法

/**
 * Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.
 *
 * @param datum  datum
 * @param source source raft peer
 * @throws Exception any exception during publish
 */
public void onPublish(Datum datum, RaftPeer source) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }
    
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }
    
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }
    
    local.resetLeaderDue();
    
    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);  //写入磁盘
    }
    
    datums.put(datum.key, datum);
    
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());
    NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());  //发布事件,异步写入内存
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

【1.4】进行磁盘的写入【同步的】

/**
 * Write datum to cache file.
 *
 * @param datum datum
 * @throws Exception any exception during writing
 */
public synchronized void write(final Datum datum) throws Exception {
    
    String namespaceId = KeyBuilder.getNamespace(datum.key);
    
    File cacheFile = new File(cacheFileName(namespaceId, datum.key));
    
    if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
        MetricsMonitor.getDiskException().increment();
        
        throw new IllegalStateException("can not make cache file: " + cacheFile.getName());
    }
    
    ByteBuffer data;
    
    data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));
    
    try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {
        fc.write(data, data.position());
        fc.force(true);
    } catch (Exception e) {
        MetricsMonitor.getDiskException().increment();
        throw e;
    }
    
    // remove old format file:
    if (StringUtils.isNoneBlank(namespaceId)) {
        if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {
            String oldDatumKey = datum.key
                    .replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);
            
            cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));
            if (cacheFile.exists() && !cacheFile.delete()) {
                Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key,
                        datum.value);
                throw new IllegalStateException("failed to delete old format datum: " + datum.key);
            }
        }
    }
}

private String cacheFileName(String namespaceId, String datumKey) {
    String fileName;
    if (StringUtils.isNotBlank(namespaceId)) {
        //对应集群节点下面的data/naming/data/public下面
        fileName = CACHE_DIR + File.separator + namespaceId + File.separator + encodeDatumKey(datumKey);
    } else {
        fileName = CACHE_DIR + File.separator + encodeDatumKey(datumKey);
    }
    return fileName;
}

【1.5】发布事件写入内存【异步的】

//使用快捷键 Ctrl+Shift+f 或 通过 Edit —> Find —>Find In Path 检索 ValueChangeEvent 看对应的 onEvent处理
PersistentNotifier类#onEvent方法
@Override
public void onEvent(ValueChangeEvent event) {
    notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}

public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
    if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
        if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
            for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                try {
                    if (action == DataOperation.CHANGE) {
                        listener.onChange(key, value);
                    }
                    if (action == DataOperation.DELETE) {
                        listener.onDelete(key);
                    }
                } catch (Throwable e) {
                    Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
                }
            }
        }
    }
    
    if (!listenerMap.containsKey(key)) {
        return;
    }
    
    for (RecordListener listener : listenerMap.get(key)) {
        try {
            if (action == DataOperation.CHANGE) {
                listener.onChange(key, value);
                continue;
            }
            if (action == DataOperation.DELETE) {
                listener.onDelete(key);
            }
        } catch (Throwable e) {
            Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
        }
    }
}

Service类#onChange方法
@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

 结合源码分析选举部分的验证

【1】核心类RaftCore类#init()

【1.1】重点说明:该方法利用的是Spring的特性@PostConstruct注解,在初始化类的时候自动启动了两个重要的定时任务【选举任务(MasterElection)和心跳任务(HeartBeat)】

【1.2】代码展示

【2】选举任务的分析

【2.1】选举定时器代码展示

//对选举任务进行注册
public static ScheduledFuture registerMasterElection(Runnable runnable) {
    return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}

//从GlobalExecutor类中可以看出,默认的线程数量为JVM可用超线程的两倍
private static final ScheduledExecutorService NAMING_TIMER_EXECUTOR = ExecutorFactory.Managed
            .newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
                    Runtime.getRuntime().availableProcessors() * 2,
                    new NameThreadFactory("com.alibaba.nacos.naming.timer"));

【2.2】选举任务代码展示【在这里你会发现,一个bug,如果先苏醒的节点是缺失数据的节点呢,那么节点必然会丢失,但是这个问题并不严重,就和AP架构是一样的,客户端会重新建立连接补全数据】

public class MasterElection implements Runnable {
    
    @Override
    public void run() {
        try {
            if (stopWork) {
                return;
            }
            if (!peers.isReady()) {
                return;
            }
            
            //这个实在选举的时候会产生
            //(而其中的休眠时间是,随机时间是0-15s,与数据量无关,所以选举出来的leader不一定是数据最全的)
            RaftPeer local = peers.local();
            //定时任务默认是500MS
            local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
            
            if (local.leaderDueMs > 0) {
                return;
            }
            
            // reset timeout
            //重置选举时间【15s-20s】
            local.resetLeaderDue();
            //重置心跳时间5s
            local.resetHeartbeatDue();
            //发起投票
            sendVote();
        } catch (Exception e) {
            Loggers.RAFT.warn("[RAFT] error while master election {}", e);
        }
        
    }
    
    private void sendVote() {
        
        RaftPeer local = peers.get(NetUtils.localServer());
        Loggers.RAFT.info(...);
        
        //将之前的选票信息清空
        peers.reset();
        //增加选举周期
        local.term.incrementAndGet();
        local.voteFor = local.ip;
        //设置当前节点为候选者
        local.state = RaftPeer.State.CANDIDATE;
        
        Map<String, String> params = new HashMap<>(1);
        params.put("vote", JacksonUtils.toJson(local));
        //遍历集群
        for (final String server : peers.allServersWithoutMySelf()) {
            final String url = buildUrl(server, API_VOTE);
            try {
                //发送异步请求
                HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                            return;
                        }
                        
                        RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
                        
                        Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                        
                        peers.decideLeader(peer);
                        
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {...}
                    
                    @Override
                    public void onCancel() {}
                });
            } catch (Exception e) {...}
        }
    }
}

//异步情况下对于受到回复
public RaftPeer decideLeader(RaftPeer candidate) {
    peers.put(candidate.ip, candidate);
    
    SortedBag ips = new TreeBag();
    int maxApproveCount = 0;
    String maxApprovePeer = null;
    for (RaftPeer peer : peers.values()) {
        if (StringUtils.isEmpty(peer.voteFor)) {
            continue;
        }
        
        ips.add(peer.voteFor);
        if (ips.getCount(peer.voteFor) > maxApproveCount) {
            maxApproveCount = ips.getCount(peer.voteFor);
            maxApprovePeer = peer.voteFor;
        }
    }
    
    if (maxApproveCount >= majorityCount()) {
        RaftPeer peer = peers.get(maxApprovePeer);
        peer.state = RaftPeer.State.LEADER;
        
        if (!Objects.equals(leader, peer)) {
            leader = peer;
            ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
            Loggers.RAFT.info("{} has become the LEADER", leader.ip);
        }
    }
    
    return leader;
}

【2.3】接收到选票的处理

//RaftController类
@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
    if (versionJudgement.allMemberIsNewVersion()) {
        throw new IllegalStateException("old raft protocol already stop");
    }
    RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
    
    return JacksonUtils.transferToJsonNode(peer);
}

public synchronized RaftPeer receivedVote(RaftPeer remote) {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    if (!peers.contains(remote)) {
        throw new IllegalStateException("can not find peer: " + remote.ip);
    }
    
    RaftPeer local = peers.get(NetUtils.localServer());
    //这种其实是应对两个节点都发送了选票环节,那么肯定不会去选对方
    if (remote.term.get() <= local.term.get()) {
        String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
        
        Loggers.RAFT.info(msg);
        if (StringUtils.isEmpty(local.voteFor)) {
            local.voteFor = local.ip;
        }
        
        return local;
    }
    //将选举时间重置,保证这次选举不成功会进入下一轮休眠
    local.resetLeaderDue();
    
    local.state = RaftPeer.State.FOLLOWER;
    local.voteFor = remote.ip;
    //这里的周期变更的原因是,如果这轮选举不成功的话,下一轮万一我先醒了保证选举周期是最大的
    local.term.set(remote.term.get());
    
    Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
    
    return local;
}

【3】心跳任务的分析

【3.1】心跳定时器代码展示

//定义了心跳任务500ms一次,而且与选举任务是同一个定时器
public static ScheduledFuture registerHeartbeat(Runnable runnable) {
    return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}

【3.2】心跳任务代码展示【说白了也就是会发送列表的key值【减少数据量,而对应从节点自动拉取来补全数据】过去,采用时间戳作为版本号】

public class HeartBeat implements Runnable {
    
    //非领导者不可发心跳
    @Override
    public void run() {
        try {
            if (stopWork) {
                return;
            }
            if (!peers.isReady()) {
                return;
            }
            
            RaftPeer local = peers.local();
            //心跳时间设置在5s内的随机数值
            local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
            if (local.heartbeatDueMs > 0) {
                return;
            }
            
            local.resetHeartbeatDue();
            
            sendBeat();
        } catch (Exception e) {...}
        
    }
    
    private void sendBeat() throws IOException, InterruptedException {
        RaftPeer local = peers.local();
        if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
            return;
        }
        if (Loggers.RAFT.isDebugEnabled()) {..日志部分忽略..}
        
        local.resetLeaderDue();
        
        // build data
        ObjectNode packet = JacksonUtils.createEmptyJsonNode();
        packet.replace("peer", JacksonUtils.transferToJsonNode(local));
        
        ArrayNode array = JacksonUtils.createEmptyArrayNode();
        
        if (switchDomain.isSendBeatOnly()) {
            Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
        }
        
        if (!switchDomain.isSendBeatOnly()) {
            for (Datum datum : datums.values()) {
                
                ObjectNode element = JacksonUtils.createEmptyJsonNode();
                
                if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                    element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                    element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                }
                //利用时间戳来当版本号
                element.put("timestamp", datum.timestamp.get());
                
                array.add(element);
            }
        }
        
        packet.replace("datums", array);
        // broadcast
        Map<String, String> params = new HashMap<String, String>(1);
        params.put("beat", JacksonUtils.toJson(packet));
        
        String content = JacksonUtils.toJson(params);
        
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(out);
        gzip.write(content.getBytes(StandardCharsets.UTF_8));
        gzip.close();
        
        byte[] compressedBytes = out.toByteArray();
        String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
        
        if (Loggers.RAFT.isDebugEnabled()) {
            Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                    compressedContent.length());
        }
        
        for (final String server : peers.allServersWithoutMySelf()) {
            try {
                final String url = buildUrl(server, API_BEAT);
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("send beat to server " + server);
                }
                HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                            MetricsMonitor.getLeaderSendBeatFailedException().increment();
                            return;
                        }
                        
                        peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("receive beat response from: {}", url);
                        }
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
                                throwable);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                    }
                    
                    @Override
                    public void onCancel() {
                    
                    }
                });
            } catch (Exception e) {
                Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                MetricsMonitor.getLeaderSendBeatFailedException().increment();
            }
        }
        
    }
}

【3.3】接收到心跳的处理

//RaftController类
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
    if (versionJudgement.allMemberIsNewVersion()) {
        throw new IllegalStateException("old raft protocol already stop");
    }
    //解压,转义
    String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
    String value = URLDecoder.decode(entity, "UTF-8");
    value = URLDecoder.decode(value, "UTF-8");
    
    JsonNode json = JacksonUtils.toObj(value);
    
    RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
    
    return JacksonUtils.transferToJsonNode(peer);
}

//处理心跳请求
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    final RaftPeer local = peers.local();
    final RaftPeer remote = new RaftPeer();
    JsonNode peer = beat.get("peer");
    remote.ip = peer.get("ip").asText();
    remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
    remote.term.set(peer.get("term").asLong());
    remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
    remote.leaderDueMs = peer.get("leaderDueMs").asLong();
    remote.voteFor = peer.get("voteFor").asText();
    
    if (remote.state != RaftPeer.State.LEADER) {
        Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
                JacksonUtils.toJson(remote));
        throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
    }
    
    if (local.term.get() > remote.term.get()) {
        Loggers.RAFT
                .info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
                        remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
        throw new IllegalArgumentException(
                "out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
    }
    
    if (local.state != RaftPeer.State.FOLLOWER) {
        
        Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
        // mk follower
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
    }
    
    final JsonNode beatDatums = beat.get("datums");
    //重置心跳和选举任务的时间,这一步确保当领导者宕机后,集群节点因为没有收到领导者的心跳而重新发起选举
    local.resetLeaderDue();
    local.resetHeartbeatDue();
    
    peers.makeLeader(remote);
    
    if (!switchDomain.isSendBeatOnly()) {
        
        Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
        //先将本地数据设置为0,后面将领导有的设置为1,最后面根据设置还为0的便是要删除的
        for (Map.Entry<String, Datum> entry : datums.entrySet()) {
            receivedKeysMap.put(entry.getKey(), 0);
        }
        
        // now check datums
        List<String> batch = new ArrayList<>();
        
        int processedCount = 0;
        if (Loggers.RAFT.isDebugEnabled()) {
            Loggers.RAFT
                    .debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                            beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
        }
        for (Object object : beatDatums) {
            processedCount = processedCount + 1;
            
            JsonNode entry = (JsonNode) object;
            String key = entry.get("key").asText();
            final String datumKey;
            
            if (KeyBuilder.matchServiceMetaKey(key)) {
                datumKey = KeyBuilder.detailServiceMetaKey(key);
            } else if (KeyBuilder.matchInstanceListKey(key)) {
                datumKey = KeyBuilder.detailInstanceListkey(key);
            } else {
                // ignore corrupted key:
                continue;
            }
            
            long timestamp = entry.get("timestamp").asLong();
            
            receivedKeysMap.put(datumKey, 1);
            
            try {
                if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
                        && processedCount < beatDatums.size()) {
                    continue;
                }
                
                if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                    batch.add(datumKey);
                }
                
                //利用批处理来提高性能,满五十条进行一次处理
                //或者不满但是,现在已经到了最后了,就再一批次全部处理
                if (batch.size() < 50 && processedCount < beatDatums.size()) {
                    continue;
                }
                
                String keys = StringUtils.join(batch, ",");
                
                //如果批次里面没有数据就不进行处理
                if (batch.size() <= 0) {
                    continue;
                }
                
                Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
                                + ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
                        processedCount, beatDatums.size(), datums.size());
                
                // update datum entry
                String url = buildUrl(remote.ip, API_GET);
                Map<String, String> queryParam = new HashMap<>(1);
                queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
                HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            return;
                        }
                        
                        List<JsonNode> datumList = JacksonUtils
                                .toObj(result.getData(), new TypeReference<List<JsonNode>>() {
                                });
                        
                        for (JsonNode datumJson : datumList) {
                            Datum newDatum = null;
                            OPERATE_LOCK.lock();
                            try {
                                
                                Datum oldDatum = getDatum(datumJson.get("key").asText());
                                
                                if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
                                        .get()) {
                                    Loggers.RAFT
                                            .info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                                    datumJson.get("key").asText(),
                                                    datumJson.get("timestamp").asLong(), oldDatum.timestamp);
                                    continue;
                                }
                                
                                if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
                                    Datum<Service> serviceDatum = new Datum<>();
                                    serviceDatum.key = datumJson.get("key").asText();
                                    serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                    serviceDatum.value = JacksonUtils
                                            .toObj(datumJson.get("value").toString(), Service.class);
                                    newDatum = serviceDatum;
                                }
                                
                                if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
                                    Datum<Instances> instancesDatum = new Datum<>();
                                    instancesDatum.key = datumJson.get("key").asText();
                                    instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                    instancesDatum.value = JacksonUtils
                                            .toObj(datumJson.get("value").toString(), Instances.class);
                                    newDatum = instancesDatum;
                                }
                                
                                if (newDatum == null || newDatum.value == null) {
                                    Loggers.RAFT.error("receive null datum: {}", datumJson);
                                    continue;
                                }
                                
                                raftStore.write(newDatum);
                                
                                datums.put(newDatum.key, newDatum);
                                notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
                                
                                local.resetLeaderDue();
                                
                                if (local.term.get() + 100 > remote.term.get()) {
                                    getLeader().term.set(remote.term.get());
                                    local.term.set(getLeader().term.get());
                                } else {
                                    local.term.addAndGet(100);
                                }
                                
                                raftStore.updateTerm(local.term.get());
                                
                                Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                        newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
                                
                            } catch (Throwable e) {
                                Loggers.RAFT
                                        .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
                                                e);
                            } finally {
                                OPERATE_LOCK.unlock();
                            }
                        }
                        try {
                            TimeUnit.MILLISECONDS.sleep(200);
                        } catch (InterruptedException e) {
                            Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
                        }
                        return;
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
                    }
                    
                    @Override
                    public void onCancel() {
                    
                    }
                    
                });
                
                batch.clear();
                
            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
            }
            
        }
        
        List<String> deadKeys = new ArrayList<>();
        for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
            if (entry.getValue() == 0) {
                deadKeys.add(entry.getKey());
            }
        }
        //批量删除
        for (String deadKey : deadKeys) {
            try {
                deleteDatum(deadKey);
            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
            }
        }
        
    }
    
    return local;
}

Nacos集群数据一致性(持久化实例CP模式Raft协议实现)

2168218-20221211211022916-1594118886.jpg

注册中心CAP架构剖析

2168218-20221211210840038-1591821095.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK