4

Java实现平滑加权轮询算法--降权和提权 - 渊渟岳

 2 years ago
source link: https://www.cnblogs.com/dennyLee2025/p/16138174.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

上一篇讲了普通轮询、加权轮询的两种实现方式,重点讲了平滑加权轮询算法,并在文末留下了悬念:节点出现分配失败时降低有效权重值;成功时提高有效权重值(但不能大于weight值)

本文在平滑加权轮询算法的基础上讲,还没弄懂的可以看上一篇文章。

现在来模拟实现:平滑加权轮询算法的降权和提权

1.两个关键点

节点宕机时,降低有效权重值;

节点正常时,提高有效权重值(但不能大于weight值);

注意:降低或提高权重都是针对有效权重

2.代码实现

2.1.服务节点类

package com.yty.loadbalancingalgorithm.wrr;

/**
 * String ip:负载IP
 * final Integer weight:权重,保存配置的权重
 * Integer effectiveWeight:有效权重,轮询的过程权重可能变化
 * Integer currentWeight:当前权重,比对该值大小获取节点
 *   第一次加权轮询时:currentWeight = weight = effectiveWeight
 *   后面每次加权轮询时:currentWeight 的值都会不断变化,其他权重不变
 * Boolean isAvailable:是否存活
 */
public class ServerNode implements Comparable<ServerNode>{
    private String ip;
    private final Integer weight;
    private Integer effectiveWeight;
    private Integer currentWeight;
    private Boolean isAvailable;

    public ServerNode(String ip, Integer weight){
        this(ip,weight,true);
    }
    public ServerNode(String ip, Integer weight,Boolean isAvailable){
        this.ip = ip;
        this.weight = weight;
        this.effectiveWeight = weight;
        this.currentWeight = weight;
        this.isAvailable = isAvailable;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Integer getWeight() {
        return weight;
    }

    public Integer getEffectiveWeight() {
        return effectiveWeight;
    }

    public void setEffectiveWeight(Integer effectiveWeight) {
        this.effectiveWeight = effectiveWeight;
    }

    public Integer getCurrentWeight() {
        return currentWeight;
    }

    public void setCurrentWeight(Integer currentWeight) {
        this.currentWeight = currentWeight;
    }

    public Boolean isAvailable() {
        return isAvailable;
    }
    public void setIsAvailable(Boolean isAvailable){
        this.isAvailable = isAvailable;
    }

    // 每成功一次,恢复有效权重1,不超过配置的起始权重
    public void onInvokeSuccess(){
        if(effectiveWeight < weight) effectiveWeight++;
    }
    // 每失败一次,有效权重减少1,无底线的减少
    public void onInvokeFault(){
        effectiveWeight--;
    }

    @Override
    public int compareTo(ServerNode node) {
        return currentWeight > node.currentWeight ? 1 : (currentWeight.equals(node.currentWeight) ? 0 : -1);
    }

    @Override
    public String toString() {
        return "{ip='" + ip + "', weight=" + weight + ", effectiveWeight=" + effectiveWeight
                + ", currentWeight=" + currentWeight + ", isAvailable=" + isAvailable + "}";
    }
}

2.2.平滑轮询算法降权和提权

package com.yty.loadbalancingalgorithm.wrr;

import java.util.ArrayList;
import java.util.List;

/**
 * 加权轮询算法:加入存活状态,降权使宕机权重降低,从而不会被选中
 */
public class WeightedRoundRobinAvailable {

    private static List<ServerNode> serverNodes = new ArrayList<>();
    // 准备模拟数据
    static {
        serverNodes.add(new ServerNode("192.168.1.101",1));// 默认为true
        serverNodes.add(new ServerNode("192.168.1.102",3,false));
        serverNodes.add(new ServerNode("192.168.1.103",2));
    }

    /**
     * 按照当前权重(currentWeight)最大值获取IP
     * @return ServerNode
     */
    public ServerNode selectNode(){
        if (serverNodes.size() <= 0) return null;
        if (serverNodes.size() == 1)
            return (serverNodes.get(0).isAvailable()) ? serverNodes.get(0) : null;
        
        // 权重之和
        Integer totalWeight = 0;
        ServerNode nodeOfMaxWeight = null; // 保存轮询选中的节点信息
        synchronized (serverNodes){
            StringBuffer sb1 = new StringBuffer();
            StringBuffer sb2 = new StringBuffer();
            sb1.append(Thread.currentThread().getName()+"==加权轮询--[当前权重]值的变化:"+printCurrentWeight(serverNodes));
            // 有限权重总和可能发生变化
            for(ServerNode serverNode : serverNodes){
                totalWeight += serverNode.getEffectiveWeight();
            }

            // 选出当前权重最大的节点
            ServerNode tempNodeOfMaxWeight = serverNodes.get(0);
            for (ServerNode serverNode : serverNodes) {
                if (serverNode.isAvailable()) {
                    serverNode.onInvokeSuccess();//提权
                    sb2.append(Thread.currentThread().getName()+"==[正常节点]:"+serverNode+"\n");
                } else {
                    serverNode.onInvokeFault();//降权
                    sb2.append(Thread.currentThread().getName()+"==[宕机节点]:"+serverNode+"\n");
                }

                tempNodeOfMaxWeight = tempNodeOfMaxWeight.compareTo(serverNode) > 0 ? tempNodeOfMaxWeight : serverNode;
            }
            // 必须new个新的节点实例来保存信息,否则引用指向同一个堆实例,后面的set操作将会修改节点信息
            nodeOfMaxWeight = new ServerNode(tempNodeOfMaxWeight.getIp(),tempNodeOfMaxWeight.getWeight(),tempNodeOfMaxWeight.isAvailable());
            nodeOfMaxWeight.setEffectiveWeight(tempNodeOfMaxWeight.getEffectiveWeight());
            nodeOfMaxWeight.setCurrentWeight(tempNodeOfMaxWeight.getCurrentWeight());

            // 调整当前权重比:按权重(effectiveWeight)的比例进行调整,确保请求分发合理。
            tempNodeOfMaxWeight.setCurrentWeight(tempNodeOfMaxWeight.getCurrentWeight() - totalWeight);
            sb1.append(" -> "+printCurrentWeight(serverNodes));

            serverNodes.forEach(serverNode -> serverNode.setCurrentWeight(serverNode.getCurrentWeight()+serverNode.getEffectiveWeight()));

            sb1.append(" -> "+printCurrentWeight(serverNodes));
            System.out.print(sb2);  //所有节点的当前信息
            System.out.println(sb1); //打印当前权重变化过程
        }
        return nodeOfMaxWeight;
    }

    // 格式化打印信息
    private String printCurrentWeight(List<ServerNode> serverNodes){
        StringBuffer stringBuffer = new StringBuffer("[");
        serverNodes.forEach(node -> stringBuffer.append(node.getCurrentWeight()+",") );
        return stringBuffer.substring(0, stringBuffer.length() - 1) + "]";
    }

    // 并发测试:两个线程循环获取节点
    public static void main(String[] args) throws InterruptedException {
        // 循环次数
        int loop = 18;

        new Thread(() -> {
            WeightedRoundRobinAvailable weightedRoundRobin1 = new WeightedRoundRobinAvailable();
            for(int i=1;i<=loop;i++){
                ServerNode serverNode = weightedRoundRobin1.selectNode();
                System.out.println(Thread.currentThread().getName()+"==第"+i+"次轮询选中[当前权重最大]的节点:" + serverNode + "\n");
            }
        }).start();
        //
        new Thread(() -> {
            WeightedRoundRobinAvailable weightedRoundRobin2 = new WeightedRoundRobinAvailable();
            for(int i=1;i<=loop;i++){
                ServerNode serverNode = weightedRoundRobin2.selectNode();
                System.out.println(Thread.currentThread().getName()+"==第"+i+"次轮询选中[当前权重最大]的节点:" + serverNode + "\n");
            }
        }).start();

        //main 线程睡了一下,再偷偷把 所有宕机 拉起来:模拟服务器恢复正常
        Thread.sleep(5);
        for (ServerNode serverNode:serverNodes){
            if(!serverNode.isAvailable())
                serverNode.setIsAvailable(true);
        }
    }
}

3.分析结果

执行结果:将执行结果的前中后四次抽出来分析

Thread-0==[正常节点]:{ip='192.168.1.101', weight=1, effectiveWeight=1, currentWeight=1, isAvailable=true}

Thread-0==[宕机节点]:{ip='192.168.1.102', weight=3, effectiveWeight=2, currentWeight=3, isAvailable=false}

Thread-0==[正常节点]:{ip='192.168.1.103', weight=2, effectiveWeight=2, currentWeight=2, isAvailable=true}

Thread-0==加权轮询--[当前权重]值的变化:[1,3,2] -> [1,-3,2] -> [2,-1,4]

Thread-0==第1次轮询选中[当前权重最大]的节点:{ip='192.168.1.102', weight=3, effectiveWeight=2, currentWeight=3, isAvailable=false}

Thread-1==[正常节点]:{ip='192.168.1.101', weight=1, effectiveWeight=1, currentWeight=6, isAvailable=true}

Thread-1==[宕机节点]:{ip='192.168.1.102', weight=3, effectiveWeight=-7, currentWeight=-21, isAvailable=false}

Thread-1==[正常节点]:{ip='192.168.1.103', weight=2, effectiveWeight=2, currentWeight=12, isAvailable=true}

Thread-1==加权轮询--[当前权重]值的变化:[6,-21,12] -> [6,-21,15] -> [7,-28,17]

Thread-1==第5次轮询选中[当前权重最大]的节点:{ip='192.168.1.103', weight=2, effectiveWeight=2, currentWeight=12, isAvailable=true}

Thread-0==[正常节点]:{ip='192.168.1.101', weight=1, effectiveWeight=1, currentWeight=13, isAvailable=true}

Thread-0==[正常节点]:{ip='192.168.1.102', weight=3, effectiveWeight=3, currentWeight=-19, isAvailable=true}

Thread-0==[正常节点]:{ip='192.168.1.103', weight=2, effectiveWeight=2, currentWeight=12, isAvailable=true}

Thread-0==加权轮询--[当前权重]值的变化:[13,-19,12] -> [7,-19,12] -> [8,-16,14]

Thread-0==第15次轮询选中[当前权重最大]的节点:{ip='192.168.1.101', weight=1, effectiveWeight=1, currentWeight=13, isAvailable=true}

Thread-1==[正常节点]:{ip='192.168.1.101', weight=1, effectiveWeight=1, currentWeight=2, isAvailable=true}

Thread-1==[正常节点]:{ip='192.168.1.102', weight=3, effectiveWeight=3, currentWeight=2, isAvailable=true}

Thread-1==[正常节点]:{ip='192.168.1.103', weight=2, effectiveWeight=2, currentWeight=2, isAvailable=true}

Thread-1==加权轮询--[当前权重]值的变化:[2,2,2] -> [2,2,-4] -> [3,5,-2]

Thread-1==第18次轮询选中[当前权重最大]的节点:{ip='192.168.1.103', weight=2, effectiveWeight=2, currentWeight=2, isAvailable=true}

分析

一开始权重最高的节点虽然是宕机了,但是还是会被选中并返回;

“有效权重总和” 和 “当前权重总和”都减少了1,因为设置轮询到失败节点,都会自减1;

到第5次轮询时,当前权重已经变成了[7,-28,17],可以看出宕机节点越往后当前权重越小,所以后面根本不会再选中宕机节点,虽然没剔除故障节点,但却起到不分配宕机节点

到第15次轮询时,有效权重已经恢复起始值,当前权重变为[8,-16,14],当前权重只能慢慢恢复,并不是节点一正常就立即恢复宕机过的节点,起到对故障节点的缓冲恢复(故障过的节点可能还存在问题);

最后1次轮询时,因为没有宕机节点,所以有效权重不变,当前权重已经恢复[3,5,-2],如果再轮询一次,那就会访问到一开始故障的节点了。

降权起到缓慢“剔除”宕机节点的效果;提权起到缓冲恢复宕机节点的效果。

对比上一篇文章可以看到:

当前权重(currentWeight):针对的是节点的选择,受有效权重影响,起到缓慢“剔除”宕机节点和缓冲恢复宕机节点的效果,当前权重最高就会被选择;

有效权重(effectiveWeight):针对的是权重的变化,也即是降权和提权,降权/提权只会直接操作有效权重;

权重(weight):针对的是存储起始配置,限定有效权重的提权。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK