3

Flink SQL 子图复用逻辑分析 - Aitozi

 2 years ago
source link: https://www.cnblogs.com/Aitozi/p/16687308.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Flink SQL 子图复用逻辑分析

子图复用优化是为了找到SQL执行计划中重复的节点,将其复用,避免这部分重复计算的逻辑。先回顾SQL执行的主要流程 parser -> validate -> logical optimize -> physical optimize -> translateToExecNode。
而子图复用的逻辑就是在这个阶段进行的

private[flink] def translateToExecNodeGraph( optimizedRelNodes: Seq[RelNode], isCompiled: Boolean): ExecNodeGraph = { val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel]) if (nonPhysicalRel.nonEmpty) { throw new TableException( "The expected optimized plan is FlinkPhysicalRel plan, " + s"actual plan is ${nonPhysicalRel.head.getClass.getSimpleName} plan.") } require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel])) // Rewrite same rel object to different rel objects // in order to get the correct dag (dag reuse is based on object not digest) val shuttle = new SameRelObjectShuttle() val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle)) // reuse subplan val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig) // convert FlinkPhysicalRel DAG to ExecNodeGraph val generator = new ExecNodeGraphGenerator() val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]), isCompiled) // process the graph val context = new ProcessorContext(this) val processors = getExecNodeGraphProcessors processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context)) }

可以看到这里首先会校验relNodes都是FlinkPhysicalRel 物理执行计划的节点

require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))

SameRelObjectShuttle

/*** Rewrite same rel object to different rel objects.** <p>e.g.* {{{* Join Join* / \ / \* Filter1 Filter2 => Filter1 Filter2* \ / | |* Scan Scan1 Scan2* }}}* After rewrote, Scan1 and Scan2 are different object but have same digest.*/class SameRelObjectShuttle extends DefaultRelShuttle { private val visitedNodes = Sets.newIdentityHashSet[RelNode]() override def visit(node: RelNode): RelNode = { val visited = !visitedNodes.add(node) var change = false val newInputs = node.getInputs.map { input => val newInput = input.accept(this) change = change || (input ne newInput) newInput } if (change || visited) { node.copy(node.getTraitSet, newInputs) } else { node } }}

然后进行rel节点重写,RelShuttle的作用就是提供visit的模式根据实现的逻辑来替换树中的某些节点。可以看到这个实现中会将 同一个objec(注意这里保存visitedNodes使用的是identity hash set) 第二次访问时 copy成一个新的对象,但是有相同的digest,这一步的目的是什么呢?
我们往下面看在后续生成ExecNode时, 会创建一个IdentityHashMap 来保存访问过的Rels,所以意思就是真正生成ExecNode时,是和Rels对象一一对应的。

private final Map<FlinkPhysicalRel, ExecNode<?>> visitedRels = new IdentityHashMap();private ExecNode<?> generate(FlinkPhysicalRel rel, boolean isCompiled) { ExecNode<?> execNode = visitedRels.get(rel); if (execNode != null) { return execNode; } if (rel instanceof CommonIntermediateTableScan) { throw new TableException("Intermediate RelNode can't be converted to ExecNode."); } List<ExecNode<?>> inputNodes = new ArrayList<>(); for (RelNode input : rel.getInputs()) { inputNodes.add(generate((FlinkPhysicalRel) input, isCompiled)); } execNode = rel.translateToExecNode(isCompiled); // connects the input nodes List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size()); for (ExecNode<?> inputNode : inputNodes) { inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build()); } execNode.setInputEdges(inputEdges); visitedRels.put(rel, execNode); return execNode;}

看到这里上面将同一个object 拆成两个的目的就更不可理解了,因为本来是一个object的话在这里天然就复用了,但是拆成2个反而就不能复用了。
这里的目的是先将相同的object被重复引用的节点拆开,然后再根据digest相同以及内部规则来决定是否复用。这样就可以有Flink引擎来控制哪些节点是可以合并的。

SubplanReuseContext

在context中通过ReusableSubplanVisitor构造两组映射关系

// mapping a relNode to its digestprivate val mapRelToDigest = Maps.newIdentityHashMap[RelNode, String]()// mapping the digest to RelNodesprivate val mapDigestToReusableNodes = new util.HashMap[String, util.List[RelNode]]()

中间的逻辑比较简单就是遍历整棵树,查找是否存在可reusable的节点,怎么判断可reusable呢?

  • 同一digest下,挂了多个RelNode节点,那么这一组RelNode是同一语义的,是可以复用的候选
  • 节点没有disable reusable
/** Returns true if the given node is reusable disabled */private def isNodeReusableDisabled(node: RelNode): Boolean = { node match { // TableSourceScan node can not be reused if reuse TableSource disabled case _: FlinkLogicalLegacyTableSourceScan | _: CommonPhysicalLegacyTableSourceScan | _: FlinkLogicalTableSourceScan | _: CommonPhysicalTableSourceScan => !tableSourceReuseEnabled // Exchange node can not be reused if its input is reusable disabled case e: Exchange => isNodeReusableDisabled(e.getInput) // TableFunctionScan and sink can not be reused case _: TableFunctionScan | _: LegacySink | _: Sink => true case _ => false }}

例如TableFunctionScan就不能被Reuse(这个原因还没理解),或者exchange只有input被reuse时,该节点才能复用

SubplanReuseShuttle

在以上的visit执行完之后以及知道哪些节点是可以复用的了,最后通过一个Shuttle来将可复用的节点进行替换

class SubplanReuseShuttle(context: SubplanReuseContext) extends DefaultRelShuttle { private val mapDigestToNewNode = new util.HashMap[String, RelNode]() override def visit(rel: RelNode): RelNode = { val canReuseOtherNode = context.reuseOtherNode(rel) val digest = context.getRelDigest(rel) if (canReuseOtherNode) { val newNode = mapDigestToNewNode.get(digest) if (newNode == null) { throw new TableException("This should not happen") } newNode } else { val newNode = visitInputs(rel) mapDigestToNewNode.put(digest, newNode) newNode } }}

实现的方式就是记录每个digest对应的newNode,当可以复用时,那么直接返回该复用digest对应的RelNode(替换了原先的digest相同,对象不同的RelNode),这样整棵树中可复用的节点又重新合并了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK