5

Spark 开源新特性:Catalyst 优化流程裁剪

 3 years ago
source link: https://xie.infoq.cn/article/6f16ac2cccdad2f27f54be84a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

​​摘要:为了解决过多依赖 Hive 的问题, SparkSQL 使用了一个新的 SQL 优化器替代 Hive 中的优化器, 这个优化器就是 Catalyst。

本文分享自华为云社区《Spark 开源新特性:Catalyst 优化流程裁剪》,作者:hzjturbo 。

1. 问题背景

上图是典型的 Spark Catalyst 优化器的布局,一条由用户输入的 SQL,到真实可调度执行的 RDD DAG 任务,需要经历以下五个阶段:

  • Parser: 将 SQL 解析成相应的抽象语法树(AST),spark 也称为 Unresolved Logical Plan;

  • Analyzer: 通过查找 Metadata 的 Catalog 信息,将 Unresolved Logical Plan 变为 Resolved Logical Plan,这个过程会做表、列、数据类型等做校验;

  • Optimizer: 逻辑优化流程,通过一些优化规则对匹配上的 Plan 做转换,得到优化后的逻辑 Plan

  • Planner:根据 Optimized Logical Plan 的统计信息等转换成相应的 Physical Plan

  • Query Execution: 主要是执行前的一些 preparations 优化,比如 AQE, Exchange Reuse, CodeGen stages 合并等

上述的五个阶段中,除了 Parser (由 Antlr 实现),其他的每个阶段都是由一个个规则(Rule)构成,总共大约有 200+个,对于不同的规则,还可能需要跑多次,所以对于相对比较复杂的查询,可能得到一个 executed Plan 都需要耗费数秒。

Databricks 内部基准测试表明,对于 TPC-DS 查询,每个查询平均调用树转换函数约 280k 次,这远远超出了必要的范围。因此,我们探索在每个树节点中嵌入 BitSet,以传递自身及其子树的信息,并利用计划不变性来修剪不必要的遍历。通过原型实现验证:在 TPC-DS 基准测试中,我们看到优化的速度约为 50%,分析的速度约为 30%,整个查询编译的速度约为 34%(包括 Hive 元存储 RPC 和文件列表)[1]。

2. 设计实现

2.1 Tree Pattern Bits and Rule Id Bits

  • Tree pattern bits

在 TreeNode 增加 nodePatterns 属性,所有继承该类的节点可以通过复写该属性值来标识自己的属性。

/** * @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated *         patterns in the subtree of T. */protected val nodePatterns: Seq[TreePattern] = Seq()

TreePattern 是一个枚举类型, 对于每个节点/表达式都可以为其设置一个 TreePattern 方便标识,具体可见 TreePatterns.scala 。

例如对于 Join 节点的 nodePatterns:

override val nodePatterns : Seq[TreePattern] = {  var patterns = Seq(JOIN)  joinType match {    case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN    case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN    case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN    case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN    case _ =>  }  patterns}
  • Rule ID bits

将规则 ID 的缓存 BitSet 嵌入到每个树/表达式节点 T 中,这样我们就可以跟踪规则 R 对于根植于 T 的子树是有效还是无效。这样,如果 R 在 T 上被调用,并且已知 R 无效,如果 R 再次应用于 T(例如,R 位于定点规则批处理中),我们可以跳过它。这个想法最初被用于 Cascades optimizer,以加快探索性规划。

Rule:

abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {  // The integer id of a rule, for pruning unnecessary tree traversals.  protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)

TreeNode:

/** * A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree. * If a rule R (which does not read a varying, external state for each invocation) is * ineffective in one apply call for this TreeNode and its subtree, R will still be * ineffective for subsequent apply calls on this tree because query plan structures are * immutable. */private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)

2.2 Changes to The Transform Function Family

改造后的 transform 方法相比之前的多了两个判断,如下所示

def transformDownWithPruning(  cond: TreePatternBits => Boolean, // 判断是否存在可优化的节点,由规则设计者所提供  ruleId: RuleId = UnknownRuleId // 不会生效的规则ID,自动更新	)(rule: PartialFunction[BaseType, BaseType]): BaseType = {  // 如果上述两个条件存在一个不满足,直接跳过本次规则  if (!cond.apply(this) || isRuleIneffective(ruleId)) {    return this  }  // 执行rule的逻辑  val afterRule = CurrentOrigin.withOrigin(origin) {    rule.applyOrElse(this, identity[BaseType])  }  // Check if unchanged and then possibly return old copy to avoid gc churn.  if (this fastEquals afterRule) {    val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))    // 如果没生效,把规则ID加入到不生效的BitSet里    if (this eq rewritten_plan) {      markRuleAsIneffective(ruleId)      this    } else {      rewritten_plan    }  } else {    // If the transform function replaces this node with a new one, carry over the tags.    afterRule.copyTagsFrom(this)    afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))  }}

2.3 Changes to An Individual Rule

规则的例子:

object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform ({   case q: LogicalPlan => q transformExpressionsDown ({     case In(v, list) if list.isEmpty => ...     case expr @ In(v, list) if expr.inSetConvertible => ...   }, _.containsPattern(IN), ruleId) // 必须包含IN }, _.containsPattern(IN), ruleId) // 必须包含IN}

3. 测试结果

在 Delta 中使用 TPC-DS SF10 对 TPC-DS 查询编译时间进行了基准测试。结果如下:

  • 图 1 显示了查询编译速度;

  • 表 1 显示了几个关键树遍历函数的调用计数和 CPU 减少的细分。

我简单运行了开版本的 TPCDSQuerySuite,该测试会把 TPCDS 的语句解析优化,并且检查下生成的代码(CodeGen),平均耗时的时间为三次运行得到的最优值, 得到的结果如下:

  • 合入 PR 前[2], 包含 156 个 Tpcds 查询,平均总耗时~56s

  • 最新 Spark 开源代码,包含 150 个 Tpcds 查询,平均总耗时~19s

之所以最新的 Tpcds 查询比合入 PR 前的条数少 6 条,是因为后续有个减少重复 TPCDS 的 PR。总时长优化前是优化后的两倍多。

[1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP

[2]. [SPARK-35544][SQL] Add tree pattern pruning to Analyzer rules.

[3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link

点击关注,第一时间了解华为云新鲜技术~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK