7

时序数据库Influx-IOx源码学习十二(物理计划的执行)

 3 years ago
source link: https://my.oschina.net/u/3374539/blog/5047399
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
时序数据库Influx-IOx源码学习十二(物理计划的执行) - 刘涛华的个人空间 - OSCHINA - 中文开源技术交流社区

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见:

https://my.oschina.net/u/3374539/blog/5035628

这一章主要记录一下物理计划是怎样执行的。


在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。

对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。

对于一个过滤而言,会在物理计划中产生对应的信息,展示如下:

select * from myMeasurement where fieldKey like 'value1';

input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }

接下来看物理计划的执行代码:

pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
    match plan.output_partitioning().partition_count() {
        0 => Ok(vec![]),
        //单一块的时候直接取出数据
        1 => {
            let it = plan.execute(0).await?;
            common::collect(it).await
        }
        //多个数据块的时候就需要进行合并数据
        _ => {
            let plan = MergeExec::new(plan.clone());
            assert_eq!(1, plan.output_partitioning().partition_count());
            //这里分为了两步execute 和 collect
            common::collect(plan.execute(0).await?).await
        }
    }
}

接下来看plan.execute方法:

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
       。。。省略
      tokio::spawn(async move {
           //这里的input就代表了上面展示的filter的input或者是数据的input
          let mut stream = match input.execute(part_i).await {
              Err(e) => {
                  let arrow_error = ArrowError::ExternalError(Box::new(e));
                  sender.send(Err(arrow_error)).await.ok();
                  return;
              }
              Ok(stream) => stream,
          };
          //计划执行完成之后返回一个stream,这里就是一直next获取完
          while let Some(item) = stream.next().await {
              sender.send(item).await.ok();
          }
      });
      。。。省略
}

上面的input代表了以下这么多东西:

上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。

Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。

姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。

async fn execute(
        &self,
        partition: usize,
    ) -> datafusion::error::Result<SendableRecordBatchStream> {
        //因为在前面物理计划中得到了所有列,这里拿出列的名字
        let fields = self.schema.fields();
        let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
        //多个分区的时候可以根据分区号拿出chunk信息
        let ChunkInfo {
            chunk,
            chunk_table_schema,
        } = &self.chunk_and_infos[partition];

        //过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空
        let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
        let selection = Selection::Some(&selection_cols);
        //使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。
        let stream = chunk
            .read_filter(&self.table_name, &self.predicate, selection)
            .map_err(|e| {
                DataFusionError::Execution(format!(
                    "Error creating scan for table {} chunk {}: {}",
                    self.table_name,
                    chunk.id(),
                    e
                ))
            })?;
        //这里使用SchemaAdapterStream的结构来填充空值列
        let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema))
            .map_err(|e| DataFusionError::Internal(e.to_string()))?;

        Ok(Box::pin(adapter))
    }

这个SchemaAdapterStream在代码中给了一个特别形象的描述:

///
///                       ┌────────────────┐                         ┌─────────────────────────┐
///                       │ ┌─────┐┌─────┐ │                         │ ┌─────┐┌──────┐┌─────┐  │
///                       │ │  A  ││  C  │ │                         │ │  A  ││  B   ││  C  │  │
///                       │ │  -  ││  -  │ │                         │ │  -  ││  -   ││  -  │  │
/// ┌──────────────┐      │ │  1  ││ 10  │ │     ┌──────────────┐    │ │  1  ││ NULL ││ 10  │  │
/// │    Input     │      │ │  2  ││ 20  │ │     │   Adapter    │    │ │  2  ││ NULL ││ 20  │  │
/// │    Stream    ├────▶ │ │  3  ││ 30  │ │────▶│    Stream    ├───▶│ │  3  ││ NULL ││ 30  │  │
/// └──────────────┘      │ │  4  ││ 40  │ │     └──────────────┘    │ │  4  ││ NULL ││ 40  │  │
///                       │ └─────┘└─────┘ │                         │ └─────┘└──────┘└─────┘  │
///                       │                │                         │                         │
///                       │  Record Batch  │                         │      Record Batch       │
///                       └────────────────┘                         └─────────────────────────┘
///

接下来看如何实现数据查找的:

fn read_filter(
        &self,
        table_name: &str,
        predicate: &Predicate,
        selection: Selection<'_>,
    ) -> Result<SendableRecordBatchStream, Self::Error> {
         //chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile
        match self {
            //还是在写入阶段的buffer,暂时不支持查询条件
            Self::MutableBuffer { chunk, .. } => {
                if !predicate.is_empty() {
                    return InternalPredicateNotSupported {
                        predicate: predicate.clone(),
                    }
                    .fail();
                }
                let batch = chunk
                    .read_filter(table_name, selection)
                    .context(MutableBufferChunk)?;

                Ok(Box::pin(MemoryStream::new(vec![batch])))
            }
            //不可写阶段的buffer,对数据进行过滤
            Self::ReadBuffer { chunk, .. } => {
                let rb_predicate =
                    to_read_buffer_predicate(&predicate).context(PredicateConversion)?;
                //读取数据并过滤
                let read_results = chunk
                    .read_filter(table_name, rb_predicate, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //读取schema信息并过滤
                let schema = chunk
                    .read_filter_table_schema(table_name, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //ReadFilterResultsStream是对不同的chunk类型实现的读取接口
                Ok(Box::pin(ReadFilterResultsStream::new(
                    read_results,
                    schema.into(),
                )))
            }
            //Parquet同理
            Self::ParquetFile { chunk, .. } => chunk
                .read_filter(table_name, predicate, selection)
                .context(ParquetFileChunkError {
                    chunk_id: chunk.id(),
                }),
        }
    }

数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。

数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。

好了查询就先写到这里。

祝玩儿的开心!!


欢迎关注微信公众号:

或添加微信好友: liutaohua001


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK