8

使用 Java 转换 Apache Avro 为 Parquet 数据格式(依赖更新)

 3 years ago
source link: https://yanbin.blog/convert-apache-avro-to-parquet-in-java-use-hadoop-common-instead-of-hadoop-core/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用 Java 转换 Apache Avro 为 Parquet 数据格式(依赖更新)

2021-02-25 — Yanbin

在上篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 LogicalType。其中使用到了 hadoop-core 依赖,注意到它传递的依赖都非常老旧,到官方 Maven 仓库一看才发现还不是一般的老

avro-to-parquet-1-1.png?resize=596%2C99&ssl=1

长时间无人问津的项目,那一定有它的替代品。对啦,据说 hadoop-core 在 2009 年 7 月份更名为 hadoop-common 了,没找到官方说明,只看到 StackOverflow 的 Differences between Hadoop-coomon, Hadoop-core and Hadoop-client? 是这么说的。 应该是这么个说法,不然为何 hadoop-core 一直停留在  1.2.1 的版本,而且原来 hadoop-core 中的类在 hadoop-common 中可以找到,如类 org.apache.hadoop.fs.Path。不过在 hadoop-core-1.2.1 中的 fs/s3 包不见,这么重要的 s3 文件系统没了。

好了,针对上一篇,我们用活着的 hadoop-coomon 包来实现把  Avro 文件转换为 Parquet 文件或内存字节数组分别不同的 pom.xml 依赖配置,代码实现与前一篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 相同。

把  Avro 转换为 Parquet 文件的依赖

pom.xml 中依赖配置

<dependencies>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.11.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.htrace</groupId>
        <artifactId>htrace-core4</artifactId>
        <version>4.1.0-incubating</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-configuration2</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.woodstox</groupId>
        <artifactId>woodstox-core</artifactId>
        <version>5.0.3</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>30.1-jre</version>
    </dependency>
    <dependency>
        <groupId>commons-collections</groupId>
        <artifactId>commons-collections</artifactId>
        <version>3.2.2</version>
    </dependency>
</dependencies>

还是重复一下转换 Avro 为 Parquet 文件的代码

public static <T extends SpecificRecordBase> void writeToParquetFile(List<T> avroObjects) {
    Schema avroSchema = avroObjects.get(0).getSchema();
    GenericData genericData = GenericData.get();
    genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
    Path path = new Path("users.parquet");
    try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
            .withDataModel(genericData)
            .withSchema(avroSchema)
            .withCompressionCodec(CompressionCodecName.SNAPPY)
            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
            .build()) {
        avroObjects.forEach(r -> {
                writer.write(r);
            } catch (IOException ex) {
                throw new UncheckedIOException(ex);
    } catch (IOException e) {
        e.printStackTrace();

AvroParquetWriter.builder() 这个方法中要用到 hadoop-common 的类 org.apache.hadoop.fs.Path。

转换 Avro 为内存字节数组的依赖

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.11.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.woodstox</groupId>
        <artifactId>woodstox-core</artifactId>
        <version>5.0.3</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>30.1-jre</version>
    </dependency>
    <dependency>
        <groupId>commons-collections</groupId>
        <artifactId>commons-collections</artifactId>
        <version>3.2.2</version>
    </dependency>
</dependencies>

比前面生成 Parquet 文件要省几个依赖

再回顾一下内存中完成转换为 Parquet 字节数组的代码

public static <T extends SpecificRecordBase> byte[] writeToParquetByteArray(List<T> avroObjects) throws IOException {
    Schema avroSchema = avroObjects.get(0).getSchema();
    GenericData genericData = GenericData.get();
    genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
    InMemoryOutputFile outputFile = new InMemoryOutputFile();
    try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
            .withDataModel(genericData)
            .withSchema(avroSchema)
            .withCompressionCodec(CompressionCodecName.SNAPPY)
            .withWriteMode(ParquetFileWriter.Mode.CREATE)
            .build()) {
        avroObjects.forEach(r -> {
                writer.write(r);
            } catch (IOException ex) {
                throw new UncheckedIOException(ex);
    } catch (IOException e) {
        e.printStackTrace();
    return outputFile.toArray();

InMemoryOutputFile 的内容再次重复如下

package yanbin.blog;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class InMemoryOutputFile implements OutputFile {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException {
        return new InMemoryPositionOutputStream(baos);
    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return null;
    @Override
    public boolean supportsBlockSize() {
        return false;
    @Override
    public long defaultBlockSize() {
        return 0;
    public byte[] toArray() {
        return baos.toByteArray();
    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
        public InMemoryPositionOutputStream(OutputStream outputStream) {
            super(outputStream);
        @Override
        public long getPos() throws IOException {
            return ((ByteArrayOutputStream) this.getStream()).size();

以后还是尽量用 hadoop-common 库吧。

guest
0 Comments

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK