Avro 和 Parquet 是处理数据时常用的两种编码格式,它们同为 Hadoop 大家庭中的成员。这两种格式都是自我描述的,即在数据文件中带有 Schema。Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询,不能用于流式处理。
既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有
- 转换 Avro 数据为 Parquet 文件
- 如何支持 Avro 的 LogicalType 类型到 Parquet 的转换, 以 date 类型为例
- 实现转换 Avro 数据为 Parquet 字节数组(内存中完成 Avro 到 Parquet 的转换)
本文例子中所选择 Avro 版本是当前最新的 1.10.1
创建 Avro Schema 并编译成 Java 对象
先来创建一个 Avro schema user.avsc
, 并编译成 Java 代码。Schema 定义如下
"namespace": "yanbin.blog.data",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": ["null", "double"], "default": null}
编译成 Java 代码
本人在 Mac OS X 平台下用 brew install avro-tools
安装的命令来编译
$ avro-tools compile -string schema user.avsc ./
或者用下载的 avro-tools jar 包来编译
java -jar /path/to/avro-tools-1.10.1.jar -string compile schema user.avsc ./
或者是用配置的 Maven 插件来编译生成 yanbin.blog.data.User
类。
创建 Avro 对象并转换成 Parquet 格式
把在当前目录中生成的 User 类引入到 Java 项目中,本例用 Maven 来管理依赖,在 pom.xml 中引入最基本的依赖
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<exclusions> <!-- hadoop-core 可说是引入了一堆的垃圾,排除所有 -->
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 补充 hadoop-core 排除的但需要用到的两个包 -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
如此,整个项目的依赖就比较干净,用 mvn dependency:tree
命令显示如下:
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ test-parquet ---
[INFO] blog.yanbin:test-parquet:jar:1.0-SNAPSHOT
[INFO] +- org.apache.avro:avro:jar:1.10.1:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.11.3:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.11.3:compile
[INFO] | | \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.3:compile
[INFO] | +- org.apache.commons:commons-compress:jar:1.20:compile
[INFO] | \- org.slf4j:slf4j-api:jar:1.7.30:compile
[INFO] +- org.apache.parquet:parquet-avro:jar:1.11.1:compile
[INFO] | +- org.apache.parquet:parquet-column:jar:1.11.1:compile
[INFO] | | +- org.apache.parquet:parquet-common:jar:1.11.1:compile
[INFO] | | | \- org.apache.yetus:audience-annotations:jar:0.11.0:compile
[INFO] | | \- org.apache.parquet:parquet-encoding:jar:1.11.1:compile
[INFO] | +- org.apache.parquet:parquet-hadoop:jar:1.11.1:compile
[INFO] | | +- org.apache.parquet:parquet-jackson:jar:1.11.1:compile
[INFO] | | +- org.xerial.snappy:snappy-java:jar:1.1.7.3:compile
[INFO] | | \- commons-pool:commons-pool:jar:1.6:compile
[INFO] | \- org.apache.parquet:parquet-format-structures:jar:1.11.1:compile
[INFO] | \- javax.annotation:javax.annotation-api:jar:1.3.2:compile
[INFO] +- org.apache.hadoop:hadoop-core:jar:1.2.1:compile
[INFO] +- commons-logging:commons-logging:jar:1.2:compile
[INFO] \- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] +- commons-lang:commons-lang:jar:2.4:compile
[INFO] +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
Java 代码如下
package yanbin.blog;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import yanbin.blog.data.User;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
public class TestParquet {
public static void main(String[] args) {
List<User> users = Arrays.asList(
User.newBuilder().setId(1).setName("Scott").build(),
User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
writeToParquet(users);
public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
Schema avroSchema = avroObjects.get(0).getSchema();
String parquetFile = "./users.parquet";
Path path = new Path(parquetFile);
try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {
avroObjects.forEach(r -> {
writer.write(r);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
} catch (IOException e) {
e.printStackTrace();
执行后在当前目录下生成了文件 users.parquet
, 接下来看看这个文件中都有什么。
类似于安装 avro-tools
, 我们可以本 Mac OS X下用 brew install parquet-tools
安装 parquet 的命令,其他平台使用各自的包管理工具来安装 parquet-tools 命令。
查看 users.parquet
中的数据
$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott"}
{"id":2,"name":"Tiger","age":20.5}
查看 users.parquet
数据的 Schema
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
}
从中可以看到 Avro Schema 到 Parquet Schema 的映射,"int" 为 "int32", "string" 为 binary, 有 default 的字段在 Parquet 中会加上个 optional.
LogicalType 的转换
Avro 和 Parquet 的数据类型都有自己的 LogicType 概念,下面给 user.asvc
加上一个 date
LogicalType 的 birthday 字段,整个 user.avsc
内容如下
"namespace": "yanbin.blog.data",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": ["null", "double"], "default": null},
{"name": "birthday", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null}
再次用 avro-tools
命令编译为 User.java
文件,在 User 类中 birthday
的类型是 java.time.LocalDate
。再试图用之前的代码来转换带有 birthday
值的 Avro 对象,把前面 TestParquet
的 main
方法修改如下:
public static void main(String[] args) {
List<User> users = Arrays.asList(
User.newBuilder().setId(1).setName("Scott").setBirthday(LocalDate.now()).build(),
User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
writeToParquet(users);
执行后提示错误
Exception in thread "main" java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Number
at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:323)
at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:275)
at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
为了处理这个错误,我还跟到了源代码中,停在 org.apache.avro.generic.GenericData
类中方法
public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType)
Map<String, Conversion<?>> conversions = (Map)this.conversionsByClass.get(datumClass);
处给 conversionsByClass
添加个值也能解决这个问题,调试时执行代码
this.conversionsByClass.put(LocalDate.class, ImmutableMap.of("date", new TimeConversions.DateConversion()))
后来仔细检查 AvroParquetWriter
类可以调用 withDataModel(GenericData)
来添加 Conversion, 于是 writeToParquet()
方法的实现如下
public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
Schema avroSchema = avroObjects.get(0).getSchema();
String parquetFile = "./users.parquet";
Path path = new Path(parquetFile);
GenericData genericData = GenericData.get();
genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
.withDataModel(genericData)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()) {
avroObjects.forEach(r -> {
writer.write(r);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
} catch (IOException e) {
e.printStackTrace();
执行 TestParquet 类产生新的 users.parquet
文件,再次查看它的数据和 Schema
$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
optional int32 birthday (DATE);
}
其他的 LogicalType 应该也可以采用类似的方式来处理。如果我们查看 Conversion 的实现类有以下 9 个
必要是想必可以实现自己的 Conversion 类
其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。
如何在内存中完成 Avro 转换为 Parquet
以上转换 Avro 为 Parquet 需要生成一个文件,是否能在内存中完成 Avro 到 Parquet 格式的转换呢?即要得到 Parquet 内容的字节数组而不借助于磁盘文件。突破口应该是在 AvroParquetWriter.builder()
这个方法上,它有两个重载方法,分别是
public static <T> Builder<T> builder(Path file) {
return new Builder<T>(file);
public static <T> Builder<T> builder(OutputFile file) {
return new Builder<T>(file);
Path 是一个 org.apache.hadoop.fs.Path
类, 而非 Java 的 Path,这个估计不行,OutputFile 是一个接口 org.apache.parquet.io.OutputFile
, 看起来有戏,它目前只有一个实现类 org.apache.parquet.hadoop.util.HadoopOutputFile
, 要实现内存中存储字节类容的 OutputFile
肯定是可行的。
继续追踪 OutputFile 的 create 方法的返回值为 PositionOutputStream
, 它有两个实现 DelegatingPositionOutputStream
和 HadoopPositionOutputStream
, 前者是一个适配器. DelegatingPositionOutputStream
的构造函数需要一个 OutputStream
,把它换成一个 ByteArrayOutpuStream
就能在内存中处理了。最后只要实现 PositionOutputStream
的 long getPos()
返回 ByteArrayOutputStream
的当前位置就行。
具体实现需要创建一个 InMemoryOutputFile
和 InMemoryPositionOutputStream
类, 写在一个类文件 InMemoryOutputFile.java
中
package yanbin.blog;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class InMemoryOutputFile implements OutputFile {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE 会调用此方法
return new InMemoryPositionOutputStream(baos);
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return null;
@Override
public boolean supportsBlockSize() {
return false;
@Override
public long defaultBlockSize() {
return 0;
public byte[] toArray() {
return baos.toByteArray();
private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
public InMemoryPositionOutputStream(OutputStream outputStream) {
super(outputStream);
@Override
public long getPos() throws IOException {
return ((ByteArrayOutputStream) this.getStream()).size();
应用 InMemoryOutputFile
的 writeToParquet()
方法,为验证内存中的内容是否正确,我们把 outputFile.toArray() 输出到 users-memory.parquet 文件
public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
Schema avroSchema = avroObjects.get(0).getSchema();
GenericData genericData = GenericData.get();
genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
InMemoryOutputFile outputFile = new InMemoryOutputFile();
try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
.withDataModel(genericData)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.CREATE) // 内存中处理什么 Mode 无所谓
.build()) {
avroObjects.forEach(r -> {
writer.write(r);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
} catch (IOException e) {
e.printStackTrace();
Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
执行 TestParquest
后生成 users-memory.parquet
文件,现在是有点激动人心的时刻,验证内存中的内容是否正确
$ ls -l *.parquet
-rw-r--r-- 1 yanbin root 1173 Feb 23 23:19 users-memory.parquet
-rwxrwxrwx 1 yanbin root 1173 Feb 23 22:49 users.parquet
$
$ parquet-tools cat --json users-memory.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$
$ parquet-tools schema users-memory.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
optional int32 birthday (DATE);
}
文件大小相同,内容和 Schema 都无误,大功告成,洗洗睡了。
爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。