34

复杂业务需求下,我们为什么选择 Akka 作为异步通信框架?

 5 years ago
source link: https://www.infoq.cn/article/OvJhK48PMK7F*lbw3tKg?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通信框架,可用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用,在 Spark 中曾被用于实现进程、节点间通信,在实际项目中协助我们成功搭建了满足业务需求的模型部署平台。

项目背景

某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、订货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,需要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。

经过交流,我们发现在实际生产环境中,在各方面存在一些问题:

  • 异步:所有门店的前日销售、业务等数据均由各自门店的店长负责整合上传。上传的开始时间、结束时间、数据的完整性等均不确定。而模型训练和预测均依赖这部分数据,这就意味这无法为模型训练和预测设置统一的开始入口。

  • 高并发:除了一些特殊类型的门店,绝大多数门店的营业时间相对固定,从店长决定整理上传销售数据,到准备物料、排班准备次日营业,留给模型训练和模型预测回吐预测结果的时间大概为 3 小时。如果每个门店的预测指标有 2 至 3 项,那么需要有足够的调度能力在规定时间内完成大概 2 万次模型训练加预测流程。

  • 容错:由于门店数量众多且情况各不相同,仍然有很多潜在的因素可能导致流程出错或失败。原则上,某次流程的失败不应该对其他流程造成任何影响,每个流程在平台层面应该成为互相独立的任务。

因此,我们需要一套轻量化的分布式服务框架,来实现满足上述需求的模型训练预测平台,并在一定程度上保证平台的可拓展性。结合此前团队内的技术积累,最终选择了 Akka 框架用于实现平台的内部通信。

选型过程

消息驱动方式——流程异步化

一次完整的预测任务包括:训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出,其中数据准备步骤在时间上不确定,模型相关步骤在执行结果上不确定,如果采用同步模型,将会产生大量的等待线程,占用浪费大量资源。在 Actor 模型中,每个 Actor 作为一个基本计算单元,回应接收到的消息,同时并行的:

  • 发送有限数量的消息给其他 Actor
  • 创建有限数量的新 Actor
  • 指定接受到下一个消息时的行为

上述操作没有顺序执行的假设,因此可以并行进行。发送者与已经发送的消息解耦,可以进行无需等待的异步通信。

zQVZNrA.png!web

Actor 模型通信方式

Akka 中的 Actor 本质上就是接收消息并采取行动处理消息的对象,是封装状态和行为的对象,它们唯一的通信方式是交换消息——把消息存放在接收方的邮箱里。Actor 自然形成树形结构,这种结构的精髓在于任务被拆开、委托,直到任务小到可以被完整地处理。因此,我们将预测任务的各个步骤拆分抽象,并创建类型消息与步骤对应,将每个步骤交给线程级别的 Actor 执行处理,通过发送不同类型的消息来触发创建不同操作的 Actor,让整个预测流程无需等待。

结构——应对高并发

由于绝大多数门店的营业时间大致相同,平台在流量上会有明显的峰值和低谷,在低谷期间平台需要尽可能减少资源占有量,而在流量峰值来临时平台要能够及时响应,保证足够的可用性。

经过讨论,我们确定了采用 Master-Worker 模式的平台结构,Master 负责接收与分配任务,Worker 负责处理执行具体的模型任务。

Master 和 Worker 均为独立的 ActorSystem,管理内部不不同操作逻辑的 Actor,在空闲状态下占有资源很小。Actor 为线程级别,同样仅占用极少量资源,生命周期由 ActorSystem 统一管理。少量请求时,Actor 线程具有很高的复用率,请求并发高时,ActorSystem 会创建大量的 Actor 线程用来承接请求,保证可用性。

MfeAVry.png!web

Akka 中 Actor 的生命周期

子 Actor——模块化提高容错

每个预测任务的模型相关步骤均存在失败的可能性,此外,数据准备过程中的网络波动、内容校验出错等情况,都会导致当前预测任务的失败。对于失败的任务,我们希望能够尽可能记录错误信息,为重跑提供先决条件。

在 Akka 中,构建了父子 Actor 的树形监督结构,提供 Actor 的监督机制以保证容错性,把处理响应错误的责任交给出错对象以外的实体。父 Actor 创建子 Actor 来委托处理子任务,同时便会自动地监管它们。子 Actor 列表维护在父 Actor 的上下文中,父 Actor 可以访问它。

3uE3m2I.png!web

Akka 中的 Actor 结构

通过更进一步的拆分细化,我们将 Worker 端的 Actor 分为 Prepare 和 Executor 两种,Prepare 为主要负责数据准备步骤,Executor 负责模型相关步骤,统一由 Worker 端的父 Actor 管理,错误和异常均向上层抛出,由 Worker 端的父 Actor 记录并发送给的错误收集模块统一处理。

实践应用

ActorSystem

创建 ActorSystem 时,默认将在 classpath 中寻找 application.conf、 application.json 和 application.properties,并自动加载:

复制代码

valsystem=ActorSystem("RsModelActorSystem")
valsystem=ActorSystem("RsModelActorSystem", ConfigFactory.load())// 同上

如果想要使用自己的配置文件,可以通过 ConfigFactory 来配置加载:

复制代码

valsystem =ActorSystem("UniversityMessageSystem",
ConfigFactory.load("own-application.conf"))


valconfig =ConfigFactory.parseString(
s"""
|akka.remote.netty.tcp.hostname = $host
|akka.actor.provider = akka.remote.RemoteActorRefProvider
|akka.remote.enabled-transport = akka.remote.netty.tcp
|akka.remote.netty.tcp.port = 2445
""".stripMargin)
valsystem =ActorSystem("RsModelActorSystem",
config.withFallback(ConfigFactory.load()))// 同上

ActorSystem 的配置参数中有大量参数可以自定义,需要根据实际需要修改,例如在该项目中,后期单个算法任务对象大小超过了 Akka remote 默认包大小 128000 bytes,需要修改参数 akka.remote.netty.tcp.maximum-frame-size

Actor

一个 Actor 包含了状态、行为、一个邮箱、子 Actor 和一个监管策略,所有这些封装在一个 Actor 引用里。Actor 对象通常包含一些变量来反映其所处的可能状态,Akka-actor 自身的轻量线程与系统的其他部分完全隔离,因此无须担心并发问题。每当一个消息被处理,它会与 Actor 的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作,需要结合实际需求编写具体逻辑。Actor 的邮箱是连接发送者与接收者的纽带,每个 Actor 有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。可以有不同策略的邮箱实现供选择,缺省时为 FIFO。

编写逻辑

在 Actor 类中,主要逻辑均在 receive 方法中实现,通过偏函数方法,执行并返回对应的逻辑:

复制代码

//ActorLogging 提供 Actor 内部的日志输出
classRsActorextendsActorwithActorLogging{
overridedefreceive:Receive= {
caseMapMessage(parameters) =>
println(parameters.get("code"))

caseMapKeyMessage(parameters, key) =>
println(parameters.get(key))

caseStringMessage(msg) =>
println(msg.getBytes().length)

caseo:Object=>
println(o.getClass)

case_:AnyRef=>
println("233")
}
}

生成引用

生成一个可以接收消息的 Actor 实例主要有两个方法:

复制代码

// 生成一个基于本地类的 Actor 实例
valrsActor = system.actorOf(Props[RsActor],"rsActor")
// 生成一个基于远程地址的 Actor 实例
valrmActor =
system.actorSelection("akka.tcp://[email protected]:2445/user/rsActor")

// 使用! 向对应的 Actor 实例发送消息
rsActor !StringMessage("test")
rmActor !MapMessage(Map("code"->"233"))

Message

Akka 中对传递的消息内容并没有太严格要求,可以是基本数据类型,也可以是支持序列化的对象:

复制代码

//scala 的 case class 便于简洁地创建消息类
caseclassStringMessage(msg:String)extendsSerializable
caseclassMapMessage(parameters:Map[String,String])extendsSerializable
caseclassMapKeyMessage(parameters:Map[String,String], key:String)extendsSerializable

其他

Akka 作为一款被广泛使用的开源工具,在实际项目中体现出了很多的优势,异步的消息驱动方式也给我们提供了一套新的思路和实现方法。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK