15

实战 | 将Apache Hudi数据集写入阿里云OSS

 4 years ago
source link: http://www.cnblogs.com/leesf456/p/12773101.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

1. 引入

云上对象存储的廉价让不少公司将其作为主要的存储方案,而Hudi作为数据湖解决方案,支持对象存储也是必不可少。之前AWS EMR已经内置集成Hudi,也意味着可以在S3上无缝使用Hudi。当然国内用户可能更多使用阿里云OSS作为云上存储方案,那么如果用户想基于OSS构建数据湖,那么Hudi是否支持呢?随着Hudi社区主分支已经合并了支持OSS的PR,现在只需要基于master分支build版本即可,或者等待下一个版本释出便可直接使用,经过简单的配置便可将数据写入OSS。

2. 配置

2.1 pom依赖

需要额外添加的主要pom依赖如下

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aliyun</artifactId>
    <version>3.2.1</version>
</dependency>
<dependency>
    <groupId>com.aliyun.oss</groupId>
    <artifactId>aliyun-sdk-oss</artifactId>
    <version>3.8.1</version>
</dependency>

2.2 core-site.xml配置

若需访问OSS,需要修改core-site.xml,关键配置如下

<property>
        <name>fs.defaultFS</name>
        <value>oss://bucketname/</value>
    </property>

    <property>
      <name>fs.oss.endpoint</name>
      <value>oss-endpoint-address</value>
      <description>Aliyun OSS endpoint to connect to.</description>
    </property>

    <property>
      <name>fs.oss.accessKeyId</name>
      <value>oss_key</value>
      <description>Aliyun access key ID</description>
    </property>

    <property>
      <name>fs.oss.accessKeySecret</name>
      <value>oss-secret</value>
      <description>Aliyun access key secret</description>
    </property>

    <property>
      <name>fs.oss.impl</name>
      <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
    </property>

3. 源码

示例源码如下

import org.apache.hudi.QuickstartUtils.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.IOException;
import java.util.List;

import static org.apache.hudi.QuickstartUtils.convertToStringList;
import static org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs;
import static org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME;
import static org.apache.spark.sql.SaveMode.Overwrite;

public class OssHudiDemo {
    public static void main(String[] args) throws IOException {
        SparkSession spark = SparkSession.builder().appName("Hoodie Datasource test")
                .master("local[2]")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.io.compression.codec", "snappy")
                .config("spark.sql.hive.convertMetastoreParquet", "false")
                .getOrCreate();
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

        String tableName = "hudi_trips_cow";
        String basePath = "/tmp/hudi_trips_cow";
        DataGenerator dataGen = new DataGenerator();

        List<String> inserts = convertToStringList(dataGen.generateInserts(10));
        Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 2));
        df.write().format("org.apache.hudi").
                options(getQuickstartWriteConfigs()).
                option(TABLE_NAME, tableName).
                mode(Overwrite).
                save(basePath);

        Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(basePath + "/*/*/*");
        roViewDF.registerTempTable("hudi_ro_table");
        spark.sql("select *  from  hudi_ro_table").show(false);
        spark.stop();

    }
}

即先写入OSS,下图可以看到OSS的Bucket中已经成功写入了数据,然后再通过spark查询写入的结果。

6ry63yq.png!web

部分查询结果如下

|20200421205942     |20200421205942_2_10 |6fd496f8-ebee-4f67-8f86-783ff3fed3ab|asia/india/chennai                  |1f71bed9-833b-4fca-8b4b-4cd014bdf88a-0_2-22-30_20200421205942.parquet|0.40613510977307   |0.5644092139040959 |driver-213|0.798706304941517  |0.02698359227182834|17.851135255091155|asia/india/chennai                  |rider-213|0.0|6fd496f8-ebee-4f67-8f86-783ff3fed3ab|

所有源代码已经上传至 https://github.com/leesf/oss-hudi-demo

4. 最后

本篇文章很简单,只用作展示如何通过Hudi将数据写入OSS。当数据写入OSS后,便可打通阿里云上几乎所有产品,这使得基于阿里云技术栈进行数据湖分析将变得非常简单,比如使用DLA(Data Lake Analytics),对标AWS的Athena,对Hudi数据集进行分析查询,一体化的流程会让分析变得异常简单。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK