3

Spark学习笔记

 2 years ago
source link: https://taodaling.github.io/blog/2021/07/08/spark%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Spark基础

Spark是一种软件框架,用于管理和协调多台计算机上的计算任务。

我们通过向集群管理器提交Spark应用,而集群管理器会为应用分配足够的计算资源。

一个Spark应用由一个驱动器进程和一组执行器进程组成。驱动器进程运行应用的main函数,负责:

  • 维护Spark应用的信息
  • 回应用户的程序或输入
  • 分析任务并分发给若干执行器进行处理

执行器只负责执行由驱动器分配给它的代码,并向驱动器报告计算状态。

SparkSession

驱动器对应的类型为SparkSession,每个Spark应用都需要创建一个SparkSession。在交互模式中会自动创建一个名为spark的SparkSession对象。

结构化数据

DataFrame和Dateset数据类型代表一张结构化的分布式表,有若干个命名列以及若干条记录,每个列都有相同的数据类型。DataFrame是无类型的,它的记录类型都是Row类型,它的模式校验发生在运行期;Dataset仅在JVM语言上被支持,它允许编译期的类型校验,它的记录类型都是特定的Java Bean类型。

数据转换和动作

为了让多个执行器并发执行,Spark会将数据分解为多个数据块,每个数据块称为分区。

Spark中的数据在计算过程中是不变的,你可以通过转换操作从某个数据集生成一个新的数据集。

需要注意的是转换操作是惰性的,只有在你执行动作操作的时候,才会发生真正转换。对于用户表达的一系列转换操作,Spark会在真正需要的时候,建立从原始数据到目标数据的一套转换计划,并将其编译为可以在集群中运行的流水线式的物理执行计划,这时候由于信息充分,所以Spark可以加入很多优化,比如把过滤器提前执行从而减少处理的数据量。

如果一个转换仅生成一个新的分区,那么称为窄转换。如果转换生成多个新的分区,那么称为宽转换,也称为洗牌(shuffle)。窄转换的时候得到的数据会保留在同一个执行器的内存中,而宽转换得到的分区会写入到磁盘上并在多个执行器之间交换。

动作操作会触发真正的计算过程,动作操作会根据转换后的数据计算最终结果。比如count统计数据的行数。

由于Spark会先将转换流程编译成物理执行计划,之后进行执行,因此很自然地Spark可以支持SQL查询。 由于最终都是编译后执行,因此SQL查询和指定转换计划拥有相同的性能。

要进行SQL查询,需要将DataFrame注册成为临时视图。

flightData2015.createOrReplaceTempView("flight_data_2015")

之后就可以执行sql查询。

sqlWay = spark.sql("""
select DEST_COUNTRY_NAME, count(1) 
from flight_data_2015
group by DESCT_COUNTRY_NAME
""")

我们首先通过控制台或以Spark作业的方式提交我们的应用程序。

第一阶段,我们的用户代码会被转换为逻辑计划。

  1. 用户代码会被转换为未解析的逻辑计划。
  2. Spark利用catalog在分析器中解析逻辑计划出现的表名和列名。
  3. Catalyst优化器优化逻辑计划。

第二阶段,会选择合适的物理执行计划,以最小代价在集群上执行我们的逻辑计划。

  1. 生成多个不同的物理执行计划
  2. 通过代价模型对物理执行计划进行比较
  3. 选择合适的物理执行计划

第三阶段,Spark将所有代码运行在Spark的底层编程接口RDD上。最终结果会被返回给用户。

Schema代表DataFrame的模式(元数据),Schema定义了表的列名,列的类型。

模式可能来自数据源,这称为读时模式(schema-on-read),也可以我们自行定义模式。

Spark SQL

Spark SQL中最高级别的抽象是Catalog。Catalog负责存储用户数据中的元数据。

在编程的时候,可以通过spark.sql方法执行具体的sql代码。

数据表在逻辑上与DataFrame是相同的,只不过DataFrame是定义在编程语言中,而数据表定义在数据库中。

数据表是实际存储数据的,而视图则是不存储数据的。

如果表的实际数据由Spark管理,并且数据会在表删除的时候被清除,那么这种表称为托管表,否则称为非托管表。比如你定义一组文件为一个数据表的时候,这个表就是非托管表;在DataFrame上使用saveAsTable函数来创建一个数据表的时候,它实际上是托管表。

你需要在数据库上下文操作表数据。默认情况下使用的数据库是default。

Spark组件

driver

客户端使用driver提交spark应用。spark driver负责创建spark context,并将所有转换操作编译为一个DAG,DAG中包含tasks和stages,tasks是spark程序的最小可调度工作单位,而stages表示一组可以一起运行的tasks,不同的stages之间往往有相互依赖关系。

executor

executor是用于执行tasks的宿主,它负责保留需要的CPU和内存资源。executor会在spark应用开始时被创建,并在结束后被销毁,一个executor可以用于执行数以百计的tasks。

一个worker结点的资源是有限的,因此一个结点上只能运行固定数量的executor,而一个集群中的worker结点也是有限的,因此一个集群能运行的executor也是有限的。

master

master是一个进程,负责为driver从集群中请求资源,并且跟踪集群中结点的状态和进度。

Cluster Manager

cluster manager是一个进程,负责管理、监控工作结点,并按照master的要求保留资源。

记录级别API与声明式API

记录级别API是指将每个事件传递给应用程序处理。早期的流处理系统一般会使用这种方式。这种方式的好处是给了应用程序很大的灵活性,并且流处理系统的逻辑比较简单。缺点是应用程序需要管理复杂的状态,开发难度较大。

而声明式API是指应用程序只指定具体的计算步骤,而不需要考虑中间的流程,也不需要考虑如何从失败中恢复。比如仅提供map、reduce接口。流处理系统会自动保存相关状态并从失败中恢复。

连续处理与微批次处理

连续处理模式是指数据一旦处理完成就直接发送给下游,这样做的优点是拥有很低的延时,但是吞吐量会比较低。而微批次处理是指处理完的数据会等待一段时间(比如1s),之后会同一批次发送这个时间段所有加工完成的数据,这种模式拥有适当的延迟,但是会大幅提高吞吐量。

Spark目前选择支持微批次处理模式。

事件时间与处理时间

事件时间是指根据数据源插入记录时间来处理数据,而不是根据流处理应用程序接受数据的时间(处理时间)。

如果选择使用事件时间,那么由于延迟的存在,记录可能会以乱序到达我们的流处理系统。这时候需要指定一个时间窗口,以及时间窗口的最大超时时间(水位)。

结构化流处理

结构化流处理是指以流处理方式处理DataFrame。其背后的思想是将数据流视为连续追加的数据表,然后作业定期检查新的输入数据,对其进行处理,在需要的时候更新内部状态。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK