5

0基础就可以上手的Spark脚本开发-for Java - LingBrown

 2 years ago
source link: https://www.cnblogs.com/lbhym/p/16483798.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

0基础就可以上手的Spark脚本开发-for Java

最近由于工作需要,要分析大几百G的Nginx日志数据。之前也有过类似的需求,但那个时候数据量不多。一次只有几百兆,或者几个G。因为数据都在Hive里面,当时的做法是:把数据从Hive导到MySQL,然后写代码查询MySQL并处理。如果你的处理逻辑比较简单,或只是查询统计,不会涉及上游的服务调用,也可以直接写Hive SQL。

上面的做法在面对少量数据时还可以应付,对于大量数据就很不可取了。从Hive导数据到MySQL,光这一步就够呛,就更别说自己写的Java脚本效率性能如何了。请教同事过后,告诉我可以用Spark,并潇洒地丢给我一个Spark-Demo的jar包。之前只接触过HDFS和Hive,Spark只听说过,也准备学,但一直没时间。这下好了,有了带薪学习的机会。其实照着同事给我的jar包,照葫芦画瓢也能写出来,但是很多API都非常陌生,写出来的代码自己也不放心,所以还是有必要学学Spark的。

不过从头开始,完整学一遍Spark的话,时间肯定不够。当时接需求时,虽然知道自己不会,但是还挺相信自己的学习能力的,承诺了开发时间。所以我们的目标就是——用Spark处理Hive里面的数据,并把结果输出到MySQL中。

学习一个新知识的正常路径是:了解产生背景、了解整体架构、分模块学习功能和了解API、实战、深入学习原理和优化。由于这次目的性很强,在第三步时,只用学习跟本次需求相关的模块即可,然后就可以实战了。先从以下两个问题入手,初步了解Spark。

  • 可以用Spark做什么?
    • 并行处理分布在集群中的大规模数据集。(✅)
    • 执行交互式查询语句来探索数据集并进行数据集可视化。
    • 使用 MLlib 构建、训练,以及评估机器学习模型。
    • 使用各种数据流实现端到端的数据流水线。
    • 分析图数据和社交网络。

我们本次的目标就是用Spark处理大规模的数据集。

  • 为什么选择Spark而不是MR?

    • Spark 为中间计算结果提供了基于内存的存储,这让它比 Hadoop MR 快了很多。Spark还整合了各种上层库,比如用于机器学习的库 MLlib、提供交互式查询功能的Spark SQL、支持操作实时数据的流处理库 Structured Streaming,以及图计算库GraphX。这些库都提供了易用的 API。

初步了解Spark

Spark支持 Scala、Java、Python、SQL 和 R 等编程语言。其提供了大量模块化功能,可以适用于各种场景。其中包括 Spark SQL、Spark Structured Streaming、Spark MLlib,以及 GraphX 等模块。模块化带来的好处就是扩展性高,Spark 的重心在于快速的分布式计算引擎,而不是存储。和 Apache Hadoop 同时包含计算和存储不同,Spark 解耦了计算和存储。这意味着你可以用 Spark 读取存储在各种数据源(Apache Hadoop、Apache Cassandra、Apache HBase、MongoDB、Apache Hive、RDBMS 等)中的数据,并在内存中进行处理。你还可以扩展 Spark 的 DataFrameReader 和 DataFrameWriter,以便将其他数据源(如 Apache Kafka、Kinesis、Azure 存储、亚马逊 S3)的数据读取为DataFrame 的逻辑数据抽象,以进行操作。

1383122-20220716173127563-720620451.png

Spark 提供了一种称作 RDD(resilient distributed dataset,弹性分布式数据集)的简单逻辑数据结构,它是 Spark 最基本的抽象。Spark 各种其他高级的结构化数据抽象(比如 DataFrame 和 Dataset)都是基于 RDD 构建的。

RDD 是 Spark 最基本的抽象。RDD 关联着三个至关重要的属性:

  • 依赖关系:告诉Spark如何从输入中构建RDD,Spark 可以根据这些依赖关系重新执行操作,以此重建出 RDD。这一属性赋予了 RDD 容错的弹性。
  • 分区:分区允许 Spark 将工作以分区为单位,分配到多个执行器上进行并行计算。
  • 计算函数:就是操作RDD的函数,可以生成RDD 所表示数据的Iterator[T] 对象。

RDD的操作可以分为转化操作行动操作。顾名思义,转化操作就是将 Spark DataFrame 转化为新的 DataFrame,而不改变原有数据的操作。比如select()、filter()这样的操作,不会改变原有数据,这些操作只会将转化结果作为新的 DataFrame 返回。一般转化操作后,会迎来一个行动操作。比如通过filter()过滤数据,最后通过count()统计过滤后的数据。这个count()就是行动操作。

1383122-20220716173142536-50372216.png

上面提到了DataFrame,它是一个结构化、有格式的,且支持一些特定操作的数据集。就像分布式内存中的表一样,每列都有名字,有表结构定义,每列都有特定的数据类型。

实战Demo

引入Jar包,这里导入的版本不是很高,是因为公司的Spark集群也是2.3版本的,要跟你安装的Spark版本保持一致。

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.2</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.3.2</version>
  <scope>provided</scope>
</dependency>

下面代码中有必要的注释,带序号的注释会在代码之后会展开说说。

public class SparkDemo {
    //数据库相关配置
    private static final Properties connectionProperties = new Properties();
    private static final String HIVE_DATABASE  = "****";
    private static final String HIVE_TABLE_NAME = "****";
    private static final String JDBC_URL = "****";
    private static final String MYSQL_TABLE_NAME = "****";

    static {
        connectionProperties.put("user","*****");
        connectionProperties.put("password","*****");
        connectionProperties.put("driver","com.mysql.jdbc.Driver");
    }

    public static void main(String[] args) {
        String dt = args[0];
        //1.SparkSession是所有功能的入口,创建好后就可以用它的API来执行操作了
        SparkSession sparkSession = SparkSession.builder()
                .appName("SparkDemo")
                .config("spark.driver.maxResultSize", "3g")
                .enableHiveSupport()
                .getOrCreate();

        String sqlText = String.format("select host,url,uri,res_data,dt from %s.%s where dt=%s", HIVE_DATABASE, HIVE_TABLE_NAME, dt);
        //执行SQL并创建分区
        Dataset<Row> sql = sparkSession.sql(sqlText).repartition(8);
        //2.RDD转为JavaRDD
        JavaRDD<Row> dataRows = sql.toJavaRDD();

        //3.以分区的模式遍历数据集
        JavaRDD<Object> scanResultJavaRDD = dataRows.mapPartitions((FlatMapFunction<Iterator<Row>, Object>) rowIterator -> {
            List<Object> list = new ArrayList<>();
            Row row;
            while (rowIterator.hasNext()) {
                row = rowIterator.next();
                String host = row.getString(0);
                String url = row.getString(1);
                String uri = row.getString(2);
                String res_data = row.getString(3);
                //处理逻辑
            }
            return list.iterator();
        });
        writeToMySQL(sqlContext,scanResultJavaRDD);
        sparkSession.stop();
    }

    //4.使用SQLContext提供的API读写数据库,不只是MySQL,支持JDBC就行
    private static Dataset<Row> readMySQL(SQLContext sqlContext,String uri){
         return sqlContext.read().jdbc(JDBC_URL, MYSQL_TABLE_NAME, connectionProperties)
                .select("*")
                .where("uri=" + uri)
                .limit(1000);
    }

    private static void writeToMySQL(SQLContext sqlContext,JavaRDD<Object> resultRDD){
        sqlContext.createDataFrame(resultRDD,Object.class).write().mode(SaveMode.Append).jdbc(JDBC_URL,MYSQL_TABLE_NAME,connectionProperties);
    }
}
  1. 在 Spark 2.0 中,SparkSession 是所有 Spark 操作和数据的统一入口。它不仅封装了 Spark 程序之前的种入口(如 SparkContext、SQLContext、HiveContext、SparkConf,以及 StreamingContext 等)。所以,如果你在网上搜索过Spark的代码,可能会看见把SparkSession转换为SQLContext,在2.x及之后的版本中就不需要了。
  2. RDD和JavaRDD没有实质上的区别,只是Spark针对Java单独编写的一套API,如果你是用Scala写的,就没有这一步。
  3. 除了mapPartitions(),还有一个map()。它们都是对RDD中每个元素进行操作的API,它们的区别从名字也可以看出。mapPartitions()是针对RDD每个分区中的元素进行操作。代码中存在一个小问题,就是我会把处理结果存进list然后返回。因为我的逻辑会过滤绝大部分数据,也许10w条数据最终会留下几十条数据。如果你处理过后的数据量还非常大,不建议返回,有可能OOM,建议在mapPartitions()内部读写数据,不用返回数据。这时候mapPartitions()又体现出来了,如果使用map(),可能每个元素都会与数据库建立一次connection,而mapPartitions()一个分区会共用一个connection。
  4. 读写数据库的API看起来非常清晰,不用解释太多。其中createDataFrame(resultRDD,Object.class)会创建一个DataFrame,resultRDD是RDD,Object.class单个元素的数据结构。这里只是演示,实际可以是你自己定义的实体类。

上面这个Demo只能演示一部分功能,反正它满足我的需求。有可能不太满足你的需求,可以去看看官方的文档:https://spark.apache.org/docs/3.3.0/sql-getting-started.html 。更多的读写数据方式和操作API基本都有。

参考资料:《Spark快速大数据分析 第二版》、官方文档


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK