1

Apache Iceberg 源码分析:schema 进化

 2 years ago
source link: https://segmentfault.com/a/1190000041410659
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Apache Iceberg 源码分析:schema 进化

发布于 今天 15:15

前置:需要对 iceberg schema 信息有基本了解

Time: 2022/02/15 第一版

能做什么?为什么能这么做?

  • 表任意位置新增字段
  • 表任务位置删除字段
  • 分区字段变更

思考的问题

  • 中间字段删除后,历史的数据对应的字段怎么读?
  • 中间字段新增后,历史的数据怎么读?
  • 分区变更后,历史和新增数据怎么读?

iceberg 写 parquet 文件格式分析
image.png

如上图,Propeties 中含有本次写数据时 schema 的信息。原本表的字段为 id,name,age,dt,删除 name 字段后写入的数据文件,id 分别保留 iceberg 原本的 id,1,3,4,这时候如果我在 id 后面新增一个字段,iceberg.schema 就会变成 {"id":1,"name":"id","required":true,"type":"int"},{"id":5,"name":"new_col","required":true,"type":"string"},{"id":3,"name":"age","required":true,"type":"int"},{"id":4,"name":"dt","required":true,"type":"string"}

新增的列 id 从 5 开始

这里结合 Spark 的部分实现方式分析

RowDataReader
private CloseableIterable<InternalRow> newParquetIterable(
      InputFile location,
      FileScanTask task,
      Schema readSchema,
      Map<Integer, ?> idToConstant) {
    Parquet.ReadBuilder builder = Parquet.read(location)
        .reuseContainers()
        .split(task.start(), task.length())
        .project(readSchema)
        // 这里创建 Parquet reader 进行 parquet 读取
        .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
        .filter(task.residual())
        .caseSensitive(caseSensitive);

    if (nameMapping != null) {
      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
    }

    return builder.build();
  }
SparkParquetReaders.buildReader
@SuppressWarnings("unchecked")
  public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
                                                            MessageType fileSchema,
                                                            Map<Integer, ?> idToConstant) {
    // 判断是否含有 iceberg.id,也就是上面 parquet 文件中 iceberg.schema 中的字段 id
    if (ParquetSchemaUtil.hasIds(fileSchema)) {
      // 有的话,递归对每个 expected schema(我们想查询的字段)构建对应的 Parquet Reader
      return (ParquetValueReader<InternalRow>)
          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
              new ReadBuilder(fileSchema, idToConstant));
    } else {
      return (ParquetValueReader<InternalRow>)
          TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
              new FallbackReadBuilder(fileSchema, idToConstant));
    }
  }
ReadBuilder

Visitor 模式,直接看 ReaderBuilder 的实现,截取比较重要的 struct 方法的实现

  • expectSchema: <id:1, name: id> <id:5, name: new_col> <id:3, name:age>
  • struct: <id:1, name: id><id:2, name: old_col> <id:3, name:age>
  • 结果:InternalRowReader:<int, intReader>,<null, nullReader>,<int, intReader>
@Override
public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
                                    List<ParquetValueReader<?>> fieldReaders) {
  // expected 是我们 select 等需要的 schema,struct 是数据 parquet 文件的 schema
  // fieldReaders 是根据 parquet schema 创建出来的 readers,比如 string 对应 StringReader
  // match the expected struct's order
  Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
  Map<Integer, Type> typesById = Maps.newHashMap();
  List<Type> fields = struct.getFields();
  // 构建 fileSchema id 到 reader 和 type 的映射
  for (int i = 0; i < fields.size(); i += 1) {
    Type fieldType = fields.get(i);
    int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
    if (fieldType.getId() != null) {
      int id = fieldType.getId().intValue();
      readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
      typesById.put(id, fieldType);
    }
  }

  // 遍历 expect schema
  List<Types.NestedField> expectedFields = expected != null ?
    expected.fields() : ImmutableList.of();
  List<ParquetValueReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
    expectedFields.size());
  List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
  for (Types.NestedField field : expectedFields) {
    int id = field.fieldId();
    // 这个静态字段,不需要从文件中读取,比如分区指定了一个值,或者字段
    if (idToConstant.containsKey(id)) {
      // containsKey is used because the constant may be null
      reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
      types.add(null);
    } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
      reorderedFields.add(ParquetValueReaders.position());
      types.add(null);
    } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
      reorderedFields.add(ParquetValueReaders.constant(false));
      types.add(null);
    } else {
      // 从 file schema 构建的 reader 中,通过 id 获取
      ParquetValueReader<?> reader = readersById.get(id);
      if (reader != null) {
        reorderedFields.add(reader);
        types.add(typesById.get(id));
      } else {
        // 如果 file schema 中没有对应的 iceberg.id,也就是我们上面例子中 id 为 5 的 new_col,那么 iceberg 会把 NULL Reader 放入 reorderedFields,type 类型为 null,读出来的数据为 null 值
        reorderedFields.add(ParquetValueReaders.nulls());
        types.add(null);
      }
    }
  }

  return new InternalRowReader(types, reorderedFields);
}

从上面代码来看,iceberg 在写入数据时,会带上此时表 schema 的 id,require 相关信息,读取的时候,会把当前表的 schema 和 数据文件中的 schema id 进行比对添加。

  • 对于 rename,iceberg 不变更 id,更不影响查询
  • 对于任意位置新增删除字段,都是通过 id 映射,就算新增同名字段,对 iceberg 来说是新的 id。所以不影响其它字段的查询,也不会有数据偏移的问题
  • 反过来再思考前面遗留的问题

才疏学浅,有问题欢迎指出讨论


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK