8

使用 Java 转换 Apache Avro 为 Parquet 数据格式

 3 years ago
source link: https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用 Java 转换 Apache Avro 为 Parquet 数据格式

2021-02-23 — Yanbin

Avro 和 Parquet  是处理数据时常用的两种编码格式,它们同为 Hadoop 大家庭中的成员。这两种格式都是自我描述的,即在数据文件中带有 Schema。Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询,不能用于流式处理。

既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有

  1. 转换 Avro 数据为 Parquet 文件
  2. 如何支持 Avro 的 LogicalType 类型到 Parquet 的转换, 以 date 类型为例
  3. 实现转换 Avro 数据为 Parquet 字节数组(内存中完成 Avro 到 Parquet 的转换)

本文例子中所选择 Avro 版本是当前最新的 1.10.1

创建 Avro Schema 并编译成 Java 对象

先来创建一个 Avro schema user.avsc, 并编译成 Java 代码。Schema 定义如下

  "namespace": "yanbin.blog.data",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["null", "double"], "default": null}

编译成 Java 代码

本人在 Mac OS X 平台下用 brew install avro-tools 安装的命令来编译

$ avro-tools compile -string schema user.avsc ./

或者用下载的  avro-tools jar 包来编译

java -jar /path/to/avro-tools-1.10.1.jar -string compile schema user.avsc ./

或者是用配置的 Maven 插件来编译生成 yanbin.blog.data.User 类。

创建 Avro 对象并转换成 Parquet 格式

把在当前目录中生成的 User 类引入到 Java 项目中,本例用 Maven 来管理依赖,在 pom.xml 中引入最基本的依赖

<dependencies>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.11.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
        <exclusions> <!-- hadoop-core 可说是引入了一堆的垃圾,排除所有 -->
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- 补充 hadoop-core 排除的但需要用到的两个包 -->
    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>1.2</version>
    </dependency>
    <dependency>
        <groupId>commons-configuration</groupId>
        <artifactId>commons-configuration</artifactId>
        <version>1.6</version>
    </dependency>
</dependencies>

如此,整个项目的依赖就比较干净,用 mvn dependency:tree 命令显示如下:

[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ test-parquet ---
[INFO] blog.yanbin:test-parquet:jar:1.0-SNAPSHOT
[INFO] +- org.apache.avro:avro:jar:1.10.1:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.11.3:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.11.3:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.3:compile
[INFO] |  +- org.apache.commons:commons-compress:jar:1.20:compile
[INFO] |  \- org.slf4j:slf4j-api:jar:1.7.30:compile
[INFO] +- org.apache.parquet:parquet-avro:jar:1.11.1:compile
[INFO] |  +- org.apache.parquet:parquet-column:jar:1.11.1:compile
[INFO] |  |  +- org.apache.parquet:parquet-common:jar:1.11.1:compile
[INFO] |  |  |  \- org.apache.yetus:audience-annotations:jar:0.11.0:compile
[INFO] |  |  \- org.apache.parquet:parquet-encoding:jar:1.11.1:compile
[INFO] |  +- org.apache.parquet:parquet-hadoop:jar:1.11.1:compile
[INFO] |  |  +- org.apache.parquet:parquet-jackson:jar:1.11.1:compile
[INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.1.7.3:compile
[INFO] |  |  \- commons-pool:commons-pool:jar:1.6:compile
[INFO] |  \- org.apache.parquet:parquet-format-structures:jar:1.11.1:compile
[INFO] |     \- javax.annotation:javax.annotation-api:jar:1.3.2:compile
[INFO] +- org.apache.hadoop:hadoop-core:jar:1.2.1:compile
[INFO] +- commons-logging:commons-logging:jar:1.2:compile
[INFO] \- commons-configuration:commons-configuration:jar:1.6:compile
[INFO]    +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO]    +- commons-lang:commons-lang:jar:2.4:compile
[INFO]    +- commons-digester:commons-digester:jar:1.8:compile
[INFO]    |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO]    \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Java 代码如下

package yanbin.blog;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import yanbin.blog.data.User;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
public class TestParquet {
    public static void main(String[] args) {
        List<User> users = Arrays.asList(
                User.newBuilder().setId(1).setName("Scott").build(),
                User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
        writeToParquet(users);
    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
        Schema avroSchema = avroObjects.get(0).getSchema();
        String parquetFile = "./users.parquet";
        Path path = new Path(parquetFile);
        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
                .withSchema(avroSchema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .build()) {
            avroObjects.forEach(r -> {
                    writer.write(r);
                } catch (IOException ex) {
                    throw new UncheckedIOException(ex);
        } catch (IOException e) {
            e.printStackTrace();

执行后在当前目录下生成了文件 users.parquet, 接下来看看这个文件中都有什么。

类似于安装 avro-tools, 我们可以本 Mac OS X下用 brew install parquet-tools 安装 parquet 的命令,其他平台使用各自的包管理工具来安装 parquet-tools 命令。

查看 users.parquet 中的数据

$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott"}
{"id":2,"name":"Tiger","age":20.5}

查看 users.parquet 数据的 Schema

$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
}

从中可以看到 Avro Schema 到 Parquet Schema 的映射,"int" 为 "int32", "string" 为 binary, 有 default 的字段在 Parquet 中会加上个 optional.

LogicalType 的转换

Avro 和 Parquet 的数据类型都有自己的 LogicType 概念,下面给 user.asvc 加上一个 date LogicalType 的 birthday 字段,整个 user.avsc 内容如下

  "namespace": "yanbin.blog.data",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["null", "double"], "default": null},
    {"name": "birthday", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null}

再次用 avro-tools 命令编译为 User.java 文件,在 User 类中 birthday 的类型是 java.time.LocalDate。再试图用之前的代码来转换带有 birthday 值的 Avro 对象,把前面 TestParquetmain 方法修改如下:

    public static void main(String[] args) {
        List<User> users = Arrays.asList(
                User.newBuilder().setId(1).setName("Scott").setBirthday(LocalDate.now()).build(),
                User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
        writeToParquet(users);

执行后提示错误

Exception in thread "main" java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Number
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:323)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:275)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)

为了处理这个错误,我还跟到了源代码中,停在 org.apache.avro.generic.GenericData 类中方法

public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType)

Map<String, Conversion<?>> conversions = (Map)this.conversionsByClass.get(datumClass);

处给 conversionsByClass 添加个值也能解决这个问题,调试时执行代码

this.conversionsByClass.put(LocalDate.class, ImmutableMap.of("date", new TimeConversions.DateConversion()))

后来仔细检查 AvroParquetWriter  类可以调用 withDataModel(GenericData)  来添加 Conversion, 于是 writeToParquet() 方法的实现如下

    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
        Schema avroSchema = avroObjects.get(0).getSchema();
        String parquetFile = "./users.parquet";
        Path path = new Path(parquetFile);
        GenericData genericData = GenericData.get();
        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
                .withDataModel(genericData)
                .withSchema(avroSchema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .build()) {
            avroObjects.forEach(r -> {
                    writer.write(r);
                } catch (IOException ex) {
                    throw new UncheckedIOException(ex);
        } catch (IOException e) {
            e.printStackTrace();

执行 TestParquet  类产生新的 users.parquet  文件,再次查看它的数据和 Schema

$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
  optional int32 birthday (DATE);
}

其他的 LogicalType 应该也可以采用类似的方式来处理。如果我们查看 Conversion 的实现类有以下 9 个

avro-parquet-1.png?resize=586%2C206&ssl=1
必要是想必可以实现自己的 Conversion 类

其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。

如何在内存中完成 Avro 转换为 Parquet 

以上转换 Avro 为 Parquet 需要生成一个文件,是否能在内存中完成 Avro 到 Parquet 格式的转换呢?即要得到 Parquet  内容的字节数组而不借助于磁盘文件。突破口应该是在 AvroParquetWriter.builder() 这个方法上,它有两个重载方法,分别是

  public static <T> Builder<T> builder(Path file) {
    return new Builder<T>(file);
  public static <T> Builder<T> builder(OutputFile file) {
    return new Builder<T>(file);

Path 是一个 org.apache.hadoop.fs.Path 类, 而非 Java 的 Path,这个估计不行,OutputFile 是一个接口 org.apache.parquet.io.OutputFile, 看起来有戏,它目前只有一个实现类 org.apache.parquet.hadoop.util.HadoopOutputFile, 要实现内存中存储字节类容的 OutputFile 肯定是可行的。

继续追踪 OutputFile 的 create 方法的返回值为 PositionOutputStream, 它有两个实现 DelegatingPositionOutputStreamHadoopPositionOutputStream, 前者是一个适配器. DelegatingPositionOutputStream 的构造函数需要一个  OutputStream,把它换成一个 ByteArrayOutpuStream 就能在内存中处理了。最后只要实现 PositionOutputStream  的 long getPos() 返回 ByteArrayOutputStream  的当前位置就行。

具体实现需要创建一个  InMemoryOutputFileInMemoryPositionOutputStream  类, 写在一个类文件  InMemoryOutputFile.java 中

package yanbin.blog;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class InMemoryOutputFile implements OutputFile {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE 会调用此方法
        return new InMemoryPositionOutputStream(baos);
    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return null;
    @Override
    public boolean supportsBlockSize() {
        return false;
    @Override
    public long defaultBlockSize() {
        return 0;
    public byte[] toArray() {
        return baos.toByteArray();
    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
        public InMemoryPositionOutputStream(OutputStream outputStream) {
            super(outputStream);
        @Override
        public long getPos() throws IOException {
            return ((ByteArrayOutputStream) this.getStream()).size();

应用 InMemoryOutputFile  的 writeToParquet() 方法,为验证内存中的内容是否正确,我们把 outputFile.toArray() 输出到 users-memory.parquet 文件

    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
        Schema avroSchema = avroObjects.get(0).getSchema();
        GenericData genericData = GenericData.get();
        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
        InMemoryOutputFile outputFile = new InMemoryOutputFile();
        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
                .withDataModel(genericData)
                .withSchema(avroSchema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.CREATE) // 内存中处理什么 Mode 无所谓
                .build()) {
            avroObjects.forEach(r -> {
                    writer.write(r);
                } catch (IOException ex) {
                    throw new UncheckedIOException(ex);
        } catch (IOException e) {
            e.printStackTrace();
        Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());

执行 TestParquest 后生成  users-memory.parquet 文件,现在是有点激动人心的时刻,验证内存中的内容是否正确

$ ls -l *.parquet
-rw-r--r-- 1 yanbin root 1173 Feb 23 23:19 users-memory.parquet
-rwxrwxrwx 1 yanbin root 1173 Feb 23 22:49 users.parquet
$
$ parquet-tools cat --json users-memory.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$
$ parquet-tools schema users-memory.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
  optional int32 birthday (DATE);
}

文件大小相同,内容和 Schema 都无误,大功告成,洗洗睡了。

爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK