7

Akka入门系列(六):akka cluster中的路由和负载均衡

 3 years ago
source link: http://edisonxu.com/2018/11/14/akka-cluster-router.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Akka入门系列(六):akka cluster中的路由和负载均衡

在使用路由功能之前,我们需要先了解下常规概念:

  • Router 路由器,消息由外部发送到路由器,再由路由器通过路由算法转发给具体的执行者,相当于消息的中转站。
  • Routee 路由目标,最早处理消息的地方。

在Akka中,提供了两种做路由的方式:

  • 直接使用akka.routing.Router
  • 使用内置的Router Actor

直接使用Router类

直接使用akka.routing.Router类的原理其实与上一章的最简单的例子是一样的,只不过akka的Router类比我们实现的更复杂、更强大。创建Router类时需提供两个参数:

  • 路由规则
    akka为Router类提供了以下几种内置的路由算法类:

    • akka.routing.RoundRobinRoutingLogic
    • akka.routing.RandomRoutingLogic
    • akka.routing.SmallestMailboxRoutingLogic
    • akka.routing.BroadcastRoutingLogic
    • akka.routing.ScatterGatherFirstCompletedRoutingLogic
    • akka.routing.TailChoppingRoutingLogic
    • akka.routing.ConsistentHashingRoutingLogic
      具体算法介绍请参见文章最后的表格
  • 路由目标的序列
    该序列支持通过调用router.addRouteerouter.removeRoutee进行动态变化,但需要注意的是,akka.routing.Router类时一个immutable的线程安全类,即不可改变,这里的改变其实是将原来的router内的的routee队列增加/去掉指定routee后copy一份生成一个新的Router

    1
    def removeRoutee(routee: Routee): Router = copy(routees = routees.filterNot(_ == routee))
1
2
3
4
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.12</artifactId>
</dependency>

配置文件application.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
akka {
actor {
provider = "cluster"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
artery {
enabled = off
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}

cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"
]
}
}

实际做事的SlaveActor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SlaveActor extends AbstractActor {

LoggingAdapter log = Logging.getLogger(getContext().system(), this);

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, word-> log.info("Node {} receives: {}", getSelf().path().toSerializationFormat(), word))
.build();
}

public static void main(String[] args) {
Config config =
ConfigFactory.parseString("akka.cluster.roles = [slave]")
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.actorOf(Props.create(SlaveActor.class), "slaveActor");
}
}

包含路由的MasterActor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class MasterActor extends AbstractActor {

LoggingAdapter log = Logging.getLogger(getContext().system(), this);

private Router router = new Router(new RoundRobinRoutingLogic(), new ArrayList<>());
private Cluster cluster = Cluster.get(getContext().system());
boolean isReady = false;
private static final String SLAVE_PATH = "/user/slaveActor";

@Override
public void preStart() throws Exception {
cluster.subscribe(self(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg->{
log.info("Master got: {}", msg);
if(!isReady)
log.warning("Is not ready yet!");
else {
log.info("Routee size: {}", router.routees().length());
router.route(msg, getSender());
}
})
.match(ClusterEvent.MemberUp.class, mUp->{
if(mUp.member().hasRole("slave")) {
Address address = mUp.member().address();
String path = address + SLAVE_PATH;
ActorSelection selection = getContext().actorSelection(path);
router = router.addRoutee(selection);
isReady=true;
log.info("New routee is added!");
}
})
.match(ClusterEvent.MemberRemoved.class, mRemoved->{
router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address()+SLAVE_PATH));
log.info("Routee is removed");
})
.match(ClusterEvent.UnreachableMember.class, mRemoved-> {
router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address() + SLAVE_PATH));
log.info("Routee is removed");
})
.build();
}

public static void main(String[] args) {
int port = 2551;

// Override the configuration of the port
Config config =
ConfigFactory.parseString(
"akka.remote.netty.tcp.port=" + port + "\n" +
"akka.remote.artery.canonical.port=" + port)
.withFallback(
ConfigFactory.parseString("akka.cluster.roles = [master]"))
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("ClusterSystem", config);
ClusterHttpManagement.get(system);
AkkaManagement.get(system).start();
system.actorOf(Props.create(MasterActor.class), "masterActor");
}
}

这里将MasterActor监听了集群的MemberUp事件,通过判断事件中包含的role判断是否是SlaveActor加入集群。如果是,则将该SlaveActor加到Router中。同时,如果SlaveActor退出或变成Unreachable状态,则从Router中删除。

向MasterActor请求的客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Client
{
public static void main( String[] args ) throws InterruptedException {
Config config = ConfigFactory.load();
ActorSystem system = ActorSystem.create("ClusterSystem", config);
ActorSelection toFind = system.actorSelection("akka.tcp://[email protected]:2551/user/masterActor");
int counter = 0;
while(true){
toFind.tell("hello "+counter++, ActorRef.noSender());
System.out.println("Finish telling");
Thread.sleep(2000);
}
}
}

分别启动四个窗口: 一个masterActor节点,两个slaveActor节点,一个Client,可以看到两个slaveActor轮流打印Client传递进去的消息。这时,把其中一个slaveActor关闭,可以看到Client发送的所有消息将被剩下那个slaveActor打印出来。

使用Router Actor

除了我们自己在Actor里调用akka.routing.Router类外,Akka还提供了根据配置直接生成一个内置的RouterActor。路由逻辑在remoting和cluster两个模块中都有,如果要启用remoting中的路由,则需要引入remoting的依赖,在cluster环境下并不推荐直接去用remoting中的路由,而是用cluster模块中的cluster aware router。

RouterActor有两种类型:

  • Pool
    Router自动创建Routee作为自己的子Actor,然后部署到远程节点上。Routee被终止时,会自动从Router的路由表中删除,除非使用动态路由(指定resizer),否则Router不会重新创建新的Routee,当所有的Routee都停止时,Router也自动停止。
  • Group
    Routee actor是在Router actor以外单独创建好了,RouterActoSelection向指定的Actor Path发送消息,但默认并不监控Routee

Router actor可以通过程序配置或文件配置。如果是通过文件配置时,必须要在代码中使用FromConfigRemoteRouterConfig(将Routee部署到远程节点去)去显式的读取相关配置,否则即便在配置文件中定义了路由相关配置,akka也不会去使用。
Router actor在转发消息时不会更改消息的sender,而routee actor在回复消息时,消息直接返回到原始的发送者,不再经过router actor。

无论哪种类型,有一块是相同配置:

1
2
3
4
5
cluster {
enabled = on
allow-local-routees = off
use-roles = [slave]
}

enabled 是否启用cluster aware router
allow-local-routees 能否在本地,即router所在的节点创建和查找routee
use-roles 使用指定的角色来缩小routee的查找范围,如果routee的配置与这里的不同,则router是找不到该routee的。

我们在上面例子的基础上,把自己new的Router换成akka内置的RouterActor。改动主要有以下几个:

  • 在配置文件中指定路由相关信息
  • MasterActor中,读取路由配置,创建router及相关的routees
  1. 配置文件中actor部分增加:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    actor {
    provider = "cluster"
    deployment {
    /masterActor/poolRouter {
    router = round-robin-pool
    nr-of-instance = 5
    cluster {
    enabled = on
    allow-local-routees = on
    use-roles = [master]
    }
    }
    default {
    cluster {
    max-nr-of-instances-per-node = 5
    }
    }
    }
    }

由于我们的Router是在masterActor下创建的RouterActor,取名为poolRouter,所以其路径显然是akka.tcp://[email protected]:2551/user/masterActor/poolRouter,masterActor启动时读取的是这个配置文件,所以deployment部分对应的就是masterActor及其子Actor,所以这里只需要填入相对路径就好了。注意,由于Routee是由masterActor创建出来的,所以use-role必须是与masterActor保持一致,否则会找不到Routee!
- router 指定预设的路由器
- nr-of-instance routee的个数

注意,有两个参数非常关键:

  • actor.deployment.default.cluster.max-nr-of-instances-per-node 它是配置Router在每个节点上部署的最大Actor数,默认是1。虽然上面我们指定了routee数目为5,但是如果只起一个节点,你会发现永远是
    1个routee在打印结果。
  • max-total-nr-of-instances 定义router所能创建的routee的总数,默认是10000。通常来说足够用了。
  1. 修改MasterActor。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class MasterActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef router;

    @Override
    public void preStart() throws Exception {
    router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "poolRouter");
    /*int totalInstances = 1000;
    int maxInstancePerNode = 5, routeeNumbers=5;
    boolean allowLocalRoutees = true;
    String role = "master";
    ClusterRouterPoolSettings settings = new ClusterRouterPoolSettings(totalInstances, maxInstancePerNode, allowLocalRoutees, role);
    ClusterRouterPool routerPool = new ClusterRouterPool(new RoundRobinPool(routeeNumbers), settings);
    router = getContext().actorOf(routerPool.props(Props.create(SlaveActor.class)), "poolRouter");*/
    }

    @Override
    public Receive createReceive() {
    return receiveBuilder()
    .match(String.class, msg->{
    log.info("Master got: {}", msg);
    router.tell(msg, getSender());
    })
    .build();
    }
    }
  2. 运行
    其他不变,这次只需要启动ClientMasterActorSlaveActorMasterActor中会自动创建出来。看到日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor/poolRouter/c1] Node akka://ClusterSystem/user/masterActor/poolRouter/c1#-1154482163 receives: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor/poolRouter/c2] Node akka://ClusterSystem/user/masterActor/poolRouter/c2#-50692619 receives: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor/poolRouter/c3] Node akka://ClusterSystem/user/masterActor/poolRouter/c3#1415650532 receives: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor/poolRouter/c4] Node akka://ClusterSystem/user/masterActor/poolRouter/c4#1345851811 receives: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor/poolRouter/c5] Node akka://ClusterSystem/user/masterActor/poolRouter/c5#-1384624865 receives: hello

从c1到c5轮流打印,round-robin负载均衡起作用了。

Group

这种方式下,Routee是在Router外被创建的,一般要求尽量在Router启动前启动好Routee,因为Router在启动过程中会尝试去联络Routee。使用时与Pool型的很像,区别是

  • 需要指定routees.path (remote方式下支持完整协议路径,比如akka.tcp://ClusterSystem:2551/user/testActor但是Cluster模式下不支持,只支持相对路径)
  • 不需要指定也没有nr-of-instance参数

GroupActor是根据routees.path所配置的相对路径,去当前cluster的每一个节点上用ActorSelection去查找指定role的Routee(所以use-roles中的配置一定要和slave启动时的role一致),然后直接tell消息过去。由于整个过程是异步的,就意味着GroupActor的消息发送其实根本不关心节点上对应的Routee是否包含Routee或者是否正常启动,只是简单的根据配置去转发而已。
不去检测是否包含Routee,是因为Akka是Peer-to-Peer的设计,天生就要求所有节点对等,在这个约定下,它会认为cluster中所有节点的代码相同,一定会包含Routee。
不去检测是否正常启动,这个则是由于整个通讯都是异步的。
但我个人认为这里还是使用熔断机制来加强的,使用起来会更加方便。

  1. 修改配置文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    actor {
    provider = "cluster"
    deployment {
    /masterActor/groupRouter {
    router = round-robin-group
    cluster {
    enabled = on
    allow-local-routees = on
    use-roles = [slave]
    }
    }
    }
    }

use-roles中role加不加引号都可以。

  1. 修改MasterActor中Router的名字,与配置文件中保持一致。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。

    1
    2
    3
    4
    5
    6
    @Override
    public void preStart() throws Exception {
    router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "groupRouter");
    /*List<String> routeesPaths = Arrays.asList("akka/user/slaveActor");
    router = getContext().actorOf(new RoundRobinGroup(routeesPaths).props(), "groupRouter");*/
    }
  2. 运行
    分别在几个不同窗口启动MasterActor、多个SlaveActor后,检查集群是否稳定后,即所有节点均是UP,如果启用了akka-management-cluster-http,向监控地址发送查询请求,如
    127.0.0.1:8558/cluster/members

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    {
    "selfNode": "akka.tcp://[email protected]:2551",
    "oldestPerRole": {
    "master": "akka.tcp://[email protected]:2551",
    "dc-default": "akka.tcp://[email protected]:2551",
    "slave": "akka.tcp://[email protected]:4914"
    },
    "leader": "akka.tcp://[email protected]:2551",
    "oldest": "akka.tcp://[email protected]:2551",
    "unreachable": [],
    "members": [
    {
    "node": "akka.tcp://[email protected]:2551",
    "nodeUid": "-1141014070",
    "status": "Up",
    "roles": [
    "master",
    "dc-default"
    ]
    },
    {
    "node": "akka.tcp://[email protected]:4914",
    "nodeUid": "344021242",
    "status": "Up",
    "roles": [
    "slave",
    "dc-default"
    ]
    },
    {
    "node": "akka.tcp://[email protected]:4936",
    "nodeUid": "678163307",
    "status": "Up",
    "roles": [
    "slave",
    "dc-default"
    ]
    },
    {
    "node": "akka.tcp://[email protected]:4957",
    "nodeUid": "-573369962",
    "status": "Up",
    "roles": [
    "slave",
    "dc-default"
    ]
    }
    ]
    }

然后,启动Client向masterActor发送消息,可以看到均匀的打印出接受的日志,round-robin负载均衡起作用了。再多起几个SlaveActor,会将消息转发到新的actor中去,这就是GroupPool方式好的地方,可以动态变化。

此时,你可以尝试修改下配置,将slaveActor变成和masterActor一样的role,再运行后,你会发现有消息丢失,以及转发失败的日志出来。

这是因为在上面所有的例子中,为了方便理解,都是使用一个master+若干slave的方式来演示。
然而Akka的设计是Peer-to-Peer的,即所有节点对等,那么,RouterActor就会理所应当地认为在相同role的节点上都存在Routee,由于并没有去检查Routee是否能工作,直接进行了消息转发,而按照上面的写法masterAcotr所在的节点上压根就没起过slaveActor,所以就造成了消息丢失。
将配置中allow-local-routees改为off,这时它就不会把masterActor所在节点加到负载列表中去了。但同样的,你可以去起一个空的ActorSystem,看看有什么后果。

Akka提供的路由算法:

算法说明配置算法类RoundRobin轮询的给路由列表中每个Routee发送消息round-robin-pool 或 round-robin-groupakka.routing.RoundRobinRandom从路由列表中随机抽取一个Routee发送消息random-pool 或 random-groupakka.routing.RandomSmallestMailbox优先选取路由表中mailbox内消息数最少的Routee发送消息smallest-mailbox-poolakka.routing.SmallestMailboxBroadcast以广播的形式将消息同时转发给所有的Routeebroadcast-pool 或 broadcast-groupakka.routing.BroadcastScatterGatherFirstCompleted将消息发送给所有的Routee,并等待第一个返回的结果,将该结果返回给发送者,其他结果被忽略掉scatter-gather-pool 或 scatter-gather-groupakka.routing.ScatterGatherFirstCompletedTailChopping先随机选一个Routee发送消息,等待一个短时间的延迟后,再随机选一个Routee发送消息,等待第一个返回的结果并将该结果发送回发送者,其他结果被忽略掉tail-chopping-pool 或 tail-chopping-groupakka.routing.TailChoppingConsistentHashing使用一致性Hash算法选取Routee转发消息consistent-hashing-pool 或 consistent-hashing-groupakka.routing.ConsistentHashingBalancing所有的Routee共享同一个mailbox,它会将繁忙的Routee中的任务重新分配给空闲的Routee,不支持group和广播balancing-poolakka.routing.Balancing

本章代码地址:https://github.com/EdisonXu/akka-start-demo/tree/master/cluster

如果您觉得文章不错,可以请我喝一杯咖啡!

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK