第一部分:基石 – 解构 Spark 的分布式计算核心
第一章:超越单机之限:深入 Spark 分布式计算的本质
1.1 为何选择 Spark:从单机瓶颈到分布式革命
我们首先需要理解一个根本性的问题:我们为什么需要像 Spark 这样的工具?答案源于一个物理现实——单台计算机的局限性。在数据科学的日常工作中,我们钟爱的 pandas 库在一台性能优越的机器上可以轻松处理数百万行的数据。然而,当数据规模从 GB 级别跃升至 TB 甚至 PB 级别时,物理定律开始成为不可逾越的障碍。
内存瓶颈 (Memory Bottleneck): 当一个数据集的大小超过了计算机的物理内存(RAM)时,操作系统会开始使用虚拟内存,即把硬盘空间当作内存使用。硬盘的读写速度比内存慢数个数量级(机械硬盘慢 10^5 倍,固态硬盘慢 10^3 倍),这会导致数据处理性能急剧下降,甚至程序崩溃。你无法将一个 100GB 的文件完整读入一个只有 32GB 内存的机器中进行操作。
CPU 瓶颈 (CPU Bottleneck): 即便数据能放入内存,计算的复杂度也可能成为瓶颈。一个复杂的机器学习训练任务,或是一个需要对海量数据进行全量扫描的聚合操作,在单个 CPU(即使是多核 CPU)上可能需要运行数小时甚至数天。CPU 的核心数量和时钟频率存在物理和功耗的上限,无法无限增长。
分布式计算,正是为了打破这些单机瓶颈而生。其核心思想非常朴素:如果一台机器解决不了问题,那就用很多台机器一起来解决。
Spark 的前身,Hadoop MapReduce,是分布式计算的第一次伟大尝试。它将计算任务分解为 Map 和 Reduce 两个阶段,并能在数千台廉价商用服务器上并行执行,实现了对海量数据的处理能力。但 MapReduce 有其固有的缺陷:
高延迟的磁盘 I/O: MapReduce 的每一个计算阶段都会将中间结果写入分布式文件系统(HDFS)。这种设计保证了极高的容错性,但也带来了大量的磁盘读写,对于需要多步骤迭代计算的场景(如机器学习算法)性能极差。
编程模型僵化: 开发者被严格限制在 Map 和 Reduce 的编程范式中,实现复杂的逻辑非常繁琐。
Apache Spark 的诞生,正是对 MapReduce 的一场革命。它继承了 MapReduce 的分布式、可扩展、容错的思想,但通过引入两个颠覆性的概念,彻底改变了分布式数据处理的游戏规则:
基于内存的计算 (In-Memory Computing): Spark 允许将中间计算结果存储在参与计算的各个节点的内存中,而不仅仅是磁盘上。对于需要反复使用的数据集,这可以带来高达 100 倍的性能提升。这使得迭代式算法(如梯度下降)和交互式数据分析成为可能。
有向无环图 (Directed Acyclic Graph – DAG): Spark 将一系列的计算操作构建成一个 DAG。它并不像 MapReduce 那样在每一步后都固化结果,而是将整个计算流程看作一个图。只有当需要一个最终结果(一个“Action”操作)时,Spark 才会根据这个图制定最优的执行计划,并将任务分发到集群执行。这种“惰性求值”(Lazy Evaluation)的策略为大量的自动优化提供了可能。
因此,选择 Spark,并非仅仅是选择了一个“更快的工具”,而是选择了一种全新的、能够将成百上千台计算机的计算资源和存储资源凝聚成一个统一的、强大的“超级计算机”的思维范式和编程模型。PySpark 作为 Spark 的 Python API,则成功地将这种强大的分布式能力与 Python 语言的简洁、易用和丰富的生态系统结合在一起,成为了现代大规模数据分析和机器学习领域不可或缺的基石。
1.2 Spark 生态系统全景:核心组件与应用领域
Spark 并不是一个单一的工具,而是一个由多个紧密集成的组件构成的强大生态系统。理解每个组件的角色和定位,是有效利用 Spark 解决不同领域问题的基础。
Spark Core: 这是整个 Spark 项目的基石。它提供了 Spark 的核心功能,包括任务调度、内存管理、故障恢复、与存储系统的交互等。Spark Core 的核心数据抽象是弹性分布式数据集 (Resilient Distributed Dataset – RDD),它是后续所有高级 API 的基础。本指南的第二部分将深入剖析 RDD 的内部机制。
Spark SQL: 这是 Spark 用于处理结构化数据的模块。它在 Spark Core 之上提供了两种核心抽象:DataFrame 和 Dataset(在 PySpark 中,DataFrame 是主要的 API)。Spark SQL 允许你像使用 SQL 查询关系型数据库一样查询 Spark 中的数据,也支持直接从多种数据源(如 Hive、Parquet、JSON、CSV)读取。更重要的是,Spark SQL 包含一个名为 Catalyst 的高级查询优化器,它能够自动将用户的查询(无论是 SQL 语句还是 DataFrame 操作)优化成最高效的物理执行计划。这是 Spark 高性能的关键所在,我们将在第三部分详细探讨。
Spark Streaming (及 Structured Streaming): 这是 Spark 提供的用于处理流数据的组件。传统的 Spark Streaming(基于 DStream)将实时数据流切分成一系列小的批次(micro-batch),然后使用 Spark Core 引擎进行处理。而更现代的 Structured Streaming 则是构建在 Spark SQL 引擎之上,将数据流看作是一张“无边界的表”。你可以像对静态表一样对数据流执行查询,Spark 会负责在有新数据到达时增量地更新结果。这极大地简化了流处理应用的开发。
MLlib (Machine Learning Library): 这是 Spark 的机器学习库。它提供了大量的常用机器学习算法,这些算法都被设计为可以在集群上并行执行。MLlib 包含分类、回归、聚类、协同过滤等多种算法,以及特征提取、转换、降维和选择等工具。MLlib 利用 Spark 的迭代计算能力,在处理大规模训练数据时表现出色。
GraphX (及 GraphFrames): 这是 Spark 用于图计算的 API。GraphX 引入了带属性的图数据结构(Property Graph),并提供了一系列图计算操作符,如 pregel、connectedComponents、pageRank 等。虽然 Spark 的图计算能力不如专门的图数据库(如 Neo4j),但对于需要在已有的大规模数据集上进行图分析和挖掘的场景,GraphX 提供了一个方便且强大的解决方案。GraphFrames 是一个基于 DataFrame 的更新、更友好的图 API。
PySpark 的角色:
PySpark 并非一个独立的组件,而是整个生态系统的 Python 语言接口。它通过一个名为 Py4J 的库,让 Python 代码能够调用 Spark 底层的 Java/Scala 对象和方法。这意味着,当你使用 PySpark 编写代码时,你实际上是在 Python 进程(Driver 端)中构建计算逻辑,然后 PySpark 将这些逻辑序列化并发送到在 Java 虚拟机(JVM)中运行的 Spark Executor 上去执行。理解这一点对于后续进行性能调优(特别是关于 UDF 的部分)至关重要。
1.3 核心架构剖析:Driver、Executor 与任务的分布式之旅
为了真正掌握 PySpark,我们必须揭开其运行时的面纱,理解一个 Spark 应用(Application)是如何在集群中启动和执行的。其核心架构由三个关键角色构成:驱动器(Driver)、执行器(Executor)和集群管理器(Cluster Manager)。
核心角色定义:
驱动器 (Driver): 这是运行你的 main() 函数的进程。当你通过 spark-submit 提交一个 PySpark 脚本时,Driver 进程就会被启动。它的核心职责是:
创建 SparkContext: SparkContext 是与 Spark 集群通信的入口点,负责连接到集群管理器并申请计算资源。
将用户代码转化为任务: Driver 会将你的代码(RDD 的转换操作)解析成一个逻辑执行计划(DAG)。
生成物理执行计划: 当一个 Action 操作被触发时,DAG 调度器(DAGScheduler)会将逻辑计划(DAG)分割成一组阶段(Stages)。每个 Stage 包含了一系列可以并行执行、且没有数据混洗(Shuffle)依赖的任务(Tasks)。
任务调度: Driver 会将这些任务发送到集群中的 Executor 上去执行。
跟踪执行状态: Driver 会持续跟踪每个任务的执行情况,并在任务失败时进行重试。
执行器 (Executor): 这是在集群的工作节点(Worker Node)上为某个 Spark 应用启动的一个工作进程。每个 Executor 都拥有自己独立的 JVM 和一块内存。它的核心职责是:
执行任务: Executor 接收来自 Driver 的任务,并在其内部的线程池中执行。一个任务处理 RDD 的一个分区(Partition)。
存储数据: 如果用户代码中包含 cache() 或 persist() 操作,Executor 会负责将 RDD 的分区数据存储在内存或磁盘上。
返回结果: 将任务的计算结果返回给 Driver。
集群管理器 (Cluster Manager): 这是负责在集群中分配和管理资源的外部服务。Spark 本身不包含资源管理能力,它需要依赖一个集群管理器。常见的有:
Standalone: Spark 自带的简单集群管理器,适合学习和小型私有集群。
Apache YARN (Yet Another Resource Negotiator): Hadoop 生态系统中的标准资源管理器,也是生产环境中最常用的选择。
Apache Mesos: 一个更通用的集群资源管理器。
Kubernetes: 近年来日益流行的容器编排系统,也成为了部署 Spark 应用的重要选项。
一个 Spark 作业的生命周期(以 YARN 为例):
提交应用: 你在客户端机器上执行 spark-submit my_script.py。
Driver 启动: spark-submit 会向 YARN 的资源管理器(ResourceManager)申请启动一个容器来运行你的 Driver 进程。
申请 Executor: Driver 内部的 SparkContext 启动后,会向 YARN 的 ResourceManager 再次发出请求,申请一定数量(由配置决定)的 Executor 容器。
Executor 启动: YARN 的 ResourceManager 会在集群中可用的工作节点(NodeManager)上分配容器,并启动 Executor 进程。Executor 启动后会反向注册到 Driver。
任务分发: Driver 开始分析你的代码。当遇到一个 Action 时,它会将计算图(DAG)划分为多个 Stage,每个 Stage 包含多个 Task。然后,Driver 将这些 Task 分发给已经注册的 Executor。
任务执行: 每个 Executor 在其分配到的 CPU 核心上并行执行收到的 Task。每个 Task 处理输入 RDD 的一个分区,并将结果写到内存或本地磁盘。
结果返回/Shuffle: 如果需要 Shuffle(如 reduceByKey),Executor 会将计算的中间结果写出,供下一个 Stage 的 Task 拉取。如果是一个 collect() 这样的 Action,Executor 会将结果直接返回给 Driver。
应用完成: 当所有任务都成功执行完毕,main() 函数退出,Driver 会向 YARN ResourceManager 注销自己,并释放所有容器。应用结束。
通过代码理解 SparkContext 和 SparkSession:
在现代 PySpark (2.x 及以后) 中,我们通常使用 SparkSession 作为入口点。SparkSession 封装了 SparkContext,并提供了 DataFrame 和 Spark SQL 功能的统一入口。
# 导入 SparkSession
from pyspark.sql import SparkSession
def main():
# 1. 创建 SparkSession
# .builder 是一个建造者模式的接口
# .appName() 为你的 Spark 应用设置一个名字,这个名字会显示在 Spark UI 上
# .master() 设置要连接的 Spark 集群。
# - "local" 表示在本地单机上运行,使用一个线程
# - "local[4]" 表示在本地单机上运行,使用 4 个 CPU 核心
# - "spark://<host>:<port>" 连接到一个 Standalone 集群
# - "yarn" 连接到一个 YARN 集群 (需要相应的配置)
# .getOrCreate() 会获取一个已存在的 SparkSession,或者如果不存在,则创建一个新的
spark = SparkSession.builder
.appName("MyFirstPySparkApp")
.master("local[2]")
.getOrCreate()
# 这段代码执行时,Driver 进程就启动了,并且创建了 SparkSession
# SparkContext 仍然可以通过 SparkSession 访问
sc = spark.sparkContext
# sc.getConf().getAll() 可以查看当前 Spark 应用的所有配置
print(f"Spark 应用名称: {
sc.appName}")
print(f"Spark 应用 ID: {
sc.applicationId}")
print(f"Spark 用户: {
sc.sparkUser()}")
print(f"Spark 版本: {
sc.version}")
# 2. 创建一个 RDD (这里使用 SparkContext)
# parallelize 方法可以将一个本地的 Python 集合转换为一个分布式的 RDD
# 第二个参数 2 指定了希望将这个 RDD 分成多少个分区 (Partition)
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 2)
# 到这里,只是定义了一个 RDD,由于惰性求值,没有任何计算实际发生
print(f"RDD 的分区数量是: {
rdd.getNumPartitions()}") # 这个操作会触发一个作业来计算分区数
# 3. 对 RDD 执行一个转换 (Transformation) 操作
# map 是一个转换操作,它会对 RDD 中的每个元素应用一个函数
# 这里我们将每个数字乘以 2
doubled_rdd = rdd.map(lambda x: x * 2)
# 同样,由于惰性求值,这一步也只是在 DAG 中增加了一个节点,没有实际计算
# 4. 执行一个动作 (Action) 操作来触发计算
# collect() 是一个动作,它会将 RDD 中的所有数据收集回 Driver 进程,并以 Python 列表的形式返回
# 这个操作会触发一个完整的 Spark 作业
result = doubled_rdd.collect()
# 当 .collect() 被调用时,Driver 会分析整个 DAG (parallelize -> map),
# 生成任务,并发送到 Executor 上去执行。
# Executor 执行 map(lambda x: x*2),然后将结果返回给 Driver。
print(f"计算结果是: {
result}") # 打印最终的列表
# 5. 停止 SparkSession
# 这是一个好习惯,在应用结束时显式地停止会话,以释放资源
spark.stop()
if __name__ == "__main__":
main()
这段简单的代码完整地演示了一个 PySpark 应用的微观生命周期:从创建会话、定义数据和转换,到通过一个 Action 触发真正的分布式计算,最后释放资源。理解这个流程是后续学习所有高级功能和性能调优的基础。
第二部分:理论基石 – RDD 的设计哲学与编程范式
虽然现代 PySpark 数据分析主要围绕 DataFrame API 展开,但深入理解其底层的 RDD (Resilient Distributed Dataset) 却是从“会用”到“精通”的必经之路。RDD 是 Spark 分布式计算抽象的灵魂,它的设计思想直接决定了 Spark 的高性能和容错性。DataFrame 的所有优化,最终也是被编译成高效的 RDD 操作来执行的。
第二章:RDD – Spark 的不可变分布式集合
2.1 RDD 的五大核心属性:深入其“弹性”与“分布式”的本质
RDD,弹性分布式数据集,这个名字本身就蕴含了其核心特性。它并不仅仅是一个分布在多台机器上的数据集合,其设计充满了精巧的工程智慧。一个 RDD 对象,其内部主要由五个核心属性来定义,理解这五个属性,就等于理解了 RDD 的工作原理。
一组分区 (A list of partitions): 这是 RDD “分布式”特性的体现。一个 RDD 在逻辑上是一个完整的数据集,但在物理上,它被切分成多个分区(Partitions)。每个分区都是数据集的一个子集,并被存储在集群中的一个节点上。一个任务(Task)处理一个分区。分区的数量决定了 Spark 作业的并行度。更多的分区意味着可以利用更多的 CPU核心来并行处理,但过多的分区也会带来额外的调度开销。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PartitionExample").master("local[4]").getOrCreate()
sc = spark.sparkContext
# 创建一个有 1000 个元素的 RDD
my_data = range(1000)
# 方式一:在创建时指定分区数
rdd1 = sc.parallelize(my_data, 5) # 将这个 RDD 分成 5 个分区
print(f"rdd1 的分区数: {
rdd1.getNumPartitions()}") # getNumPartitions() 返回 RDD 的分区数量
# 方式二:从文件中读取,分区数由 Spark 决定
# 假设我们有一个名为 'my_text_file.txt' 的文件
# sc.textFile("my_text_file.txt") # 默认分区数通常与 HDFS 块大小相关
# 使用 glom() 来直观地看到分区的内容
# glom() 是一个转换操作,它会将每个分区中的所有元素合并成一个列表
# collect() 会将所有分区的结果(即多个列表)收集回 Driver
partition_contents = rdd1.glom().collect()
print("
rdd1 各个分区的内容:")
for i, partition in enumerate(partition_contents):
# partition 是一个列表,包含了该分区的所有数据
print(f" 分区 {
i}: 包含 {
len(partition)} 个元素,前3个元素是 {
partition[:3]}")
// 打印每个分区的信息,包括其包含的元素数量和前几个元素
spark.stop()
一个作用于每个分区的计算函数 (A function to compute each partition): 这是 RDD 计算的核心。每个 RDD 都包含一个函数,这个函数描述了如何基于其父 RDD 的分区来计算出当前 RDD 的分区。例如,在一个 rdd.map(lambda x: x + 1) 操作中,这个计算函数就是 lambda x: x + 1,它将被独立地应用到 RDD 的每一个分区上。
一组对其他 RDD 的依赖关系 (A list of dependencies on other RDDs): 这是 RDD “弹性”和容错机制的基石。每个 RDD 都明确地知道它是从哪些父 RDD 计算而来的。这种依赖关系被记录在一个**血缘图谱(Lineage Graph)**或 DAG 中。当某个分区的数据丢失时(例如,所在的 Executor 崩溃了),Spark 可以通过这个血缘关系,从原始数据开始,重新计算出丢失的分区,而无需对整个数据集进行备份。
依赖关系分为两种:
窄依赖 (Narrow Dependency): 父 RDD 的每个分区最多只被子 RDD 的一个分区使用。例如 map, filter, union。窄依赖的计算非常高效,可以在单个节点上以流水线(Pipelining)的方式执行,无需等待其他节点。
宽依赖 (Wide Dependency / Shuffle Dependency): 子 RDD 的一个分区可能依赖于父 RDD 的所有分区。例如 reduceByKey, groupByKey, join。宽依赖通常意味着需要在网络间进行数据混洗(Shuffle),这是一个非常昂贵的操作,因为需要将数据在不同节点间进行传输和重组。Spark 会将宽依赖作为 Stage 划分的边界。
一个可选的分区器 (An optional Partitioner for key-value RDDs): 对于键值对(Key-Value)类型的 RDD,比如通过 reduceByKey 生成的 RDD,可以带有一个分区器(Partitioner)。分区器的作用是告诉 Spark 如何根据键(Key)来决定一条数据应该被分配到哪个分区。最常见的是哈希分区器(HashPartitioner),它根据 key.hashCode() % numPartitions 来分配分区。拥有分区器可以极大地优化后续的操作。例如,如果两个 RDD 都使用了相同的哈希分区器,那么对它们进行 join 操作时,就可以避免一次 Shuffle,因为具有相同键的记录已经被预先分配到了相同的节点上。
一个可选的优先位置列表 (An optional list of preferred locations): 这是为了实现数据本地性(Data Locality)而设计的。对于从 HDFS 等分布式文件系统读取数据创建的 RDD,这个列表会记录每个分区数据所在的物理节点位置。当 Spark 调度任务时,它会优先将计算任务分配到数据所在的节点上执行(PROCESS_LOCAL 级别),以避免通过网络传输数据。如果数据所在节点繁忙,它会尝试分配到同机架的其他节点(NODE_LOCAL),最差的情况才是跨机架传输数据(RACK_LOCAL 或 ANY)。
这五大属性共同定义了一个 RDD 的全部信息,使得 Spark 能够以一种高效、容错、可恢复的方式执行分布式计算。
第二章:RDD – Spark 的不可变分布式集合
2.2 转换(Transformations)与动作(Actions):惰性求值的核心机制
Spark 的编程模型优雅而高效,其核心在于对操作的两种截然不同的分类:转换(Transformations)和动作(Actions)。理解这两者的区别,以及它们如何与“惰性求值”(Lazy Evaluation)协同工作,是掌握 PySpark 编程范式的关键。
转换(Transformations):构建计算蓝图
转换操作的本质是:从一个已有的 RDD 创建一个新的 RDD。所有的转换操作都遵循“惰性求值”的原则。当你对一个 RDD 调用一个转换方法时,Spark 并不会立即执行计算。相反,它只是在内部的 DAG(有向无环图)中记录下这个操作,将新的 RDD 作为图的一个新节点,并记录它与其父 RDD 的依赖关系。
可以把转换操作想象成是在绘制一张复杂的计算蓝图。你一步步地告诉 Spark 你想做什么——先过滤数据,再映射转换,然后分组,但 Spark 只是默默地记下你的指令,并不断完善这张蓝图,它并不会去真正地拿起工具开始施工。
常见的转换操作:
map(func): 这是最基础的转换。它将一个函数 func 应用于 RDD 的每一个元素,并返回一个包含新元素的新 RDD。输入分区和输出分区的元素是一一对应的(窄依赖)。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TransformationMap").master("local").getOrCreate()
sc = spark.sparkContext
data_rdd = sc.parallelize(["hello world", "apache spark", "pyspark is powerful"])
// 创建一个包含字符串的 RDD
# 使用 map 转换,计算每个字符串的长度
lengths_rdd = data_rdd.map(lambda s: len(s))
// lambda s: len(s) 是作用于每个元素的函数
// lengths_rdd 是一个新的 RDD,此时尚未进行任何实际计算
# 打印 RDD 对象本身,而不是其内容
print(lengths_rdd)
// 输出会显示这是一个 PipelinedRDD 或 MapPartitionsRDD,表明它是一个转换的结果
spark.stop()
filter(func): 过滤 RDD 中的元素。它将一个返回布尔值的函数 func 应用于 RDD 的每个元素,只保留那些使函数返回 True 的元素,形成一个新的 RDD(窄依赖)。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TransformationFilter").master("local").getOrCreate()
sc = spark.sparkContext
numbers_rdd = sc.parallelize(range(20))
// 创建一个从 0 到 19 的 RDD
# 使用 filter 转换,只保留偶数
even_numbers_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)
// lambda x: x % 2 == 0 是一个判断函数,返回 True 或 False
// even_numbers_rdd 是一个新的 RDD,只记录了要进行过滤的这个意图
print(even_numbers_rdd)
spark.stop()
flatMap(func): 与 map 类似,但每个输入元素可以被映射为零个、一个或多个输出元素。func 必须返回一个序列(如列表或元组),flatMap 会将所有返回的序列“压平”成一个单一的 RDD(窄依赖)。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TransformationFlatMap").master("local").getOrCreate()
sc = spark.sparkContext
sentences_rdd = sc.parallelize(["hello spark", "learning pyspark"])
// 创建一个包含两个句子的 RDD
# 使用 flatMap 将每个句子拆分成单词
words_rdd = sentences_rdd.flatMap(lambda sentence: sentence.split(" "))
// lambda sentence: sentence.split(" ") 会对每个句子返回一个单词列表,如 ['hello', 'spark']
// flatMap 会将 ['hello', 'spark'] 和 ['learning', 'pyspark'] 这两个列表合并成一个包含四个元素的新 RDD
print(words_rdd)
spark.stop()
distinct(): 返回一个包含源 RDD 中所有不重复元素的新 RDD。这是一个宽依赖操作,因为它需要进行 Shuffle 来确保全局的唯一性。
union(otherRDD): 返回一个包含源 RDD 和另一个 RDD 所有元素的新 RDD(窄依赖)。
键值对(Pair RDD)转换: 这类转换操作只能用于由 (key, value) 元组组成的 RDD。
reduceByKey(func): 对具有相同键的值进行聚合。func 接受两个值作为输入,返回一个值。这是一个宽依赖操作。
groupByKey(): 对具有相同键的值进行分组,返回一个新的 RDD,其中每个元素是 (key, an_iterable_of_values)。这也是一个宽依赖操作。通常应优先使用 reduceByKey,因为它在 Shuffle 之前会在每个分区本地进行一次预聚合,大大减少了网络传输的数据量。
sortByKey(): 按键对 RDD 进行排序(宽依赖)。
join(otherRDD): 对两个键值对 RDD 进行内连接(宽依赖)。
动作(Actions):触发计算并返回值
动作操作是整个计算过程的“扳机”。当你对一个 RDD 调用一个动作方法时,Spark 才会真正地开始工作。它会审查已经构建好的 DAG,生成一个优化的物理执行计划,将任务分发到集群上,并开始执行计算。动作操作的返回值不再是一个 RDD,而是一个非 RDD 的类型,比如一个 Python 的原生数据类型(如数字、列表)或者将结果写入外部存储系统。
常见的动作操作:
collect(): 最常用但也最需要小心的动作。它会将 RDD 中的所有元素都收集回 Driver 进程,并以一个 Python 列表的形式返回。这对于调试和查看小数据集的结果非常有用,但如果 RDD 非常大,调用 collect() 会轻易地导致 Driver 内存溢出(OutOfMemoryError),因为 Driver 的内存通常是有限的。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ActionCollect").master("local[2]").getOrCreate()
sc = spark.sparkContext
# 完整的“转换+动作”链条
data = range(100)
rdd = sc.parallelize(data, 4)
# 转换链:过滤 -> 映射
processed_rdd = rdd.filter(lambda x: x > 50).map(lambda x: x * x)
// 这一系列转换操作只是在构建 DAG
# 调用 collect() 动作,触发计算
results = processed_rdd.collect()
// 1. collect() 被调用
// 2. Spark 分析 DAG:parallelize -> filter -> map
// 3. 将任务分发到 4 个 Executor 的核心上
// 4. 每个任务处理一个分区的数据,执行过滤和映射
// 5. 各个 Executor 将其计算结果发送回 Driver
// 6. Driver 将所有结果合并成一个 Python 列表
print(f"大于50的数字的平方是: {
results}")
spark.stop()
count(): 返回 RDD 中的元素数量(一个整数)。
first(): 返回 RDD 的第一个元素(与 take(1)[0] 类似)。
take(n): 从 RDD 中返回前 n 个元素,并以一个列表的形式返回给 Driver。它会尝试只计算尽可能少的分区来满足 n 个元素的需求,比 collect() 更高效。
reduce(func): 通过一个二元函数 func 来并行地聚合 RDD 中的所有元素。func 必须是满足交换律和结合律的,这样才能在分区内和分区间正确地并行计算。
from pyspark.sql import SparkSession
import operator # 导入 operator 模块以使用 add 函数
spark = SparkSession.builder.appName("ActionReduce").master("local").getOrCreate()
sc = spark.sparkContext
numbers_rdd = sc.parallelize(range(1, 101), 4) # 1 到 100,分成 4 个分区
# 使用 reduce 计算 1 到 100 的总和
total_sum = numbers_rdd.reduce(operator.add)
# 1. 每个分区内部会先用 operator.add 进行一次本地求和
# 比如分区1: sum(1..25), 分区2: sum(26..50), ...
# 2. 这 4 个分区的局部和会被发送回 Driver
# 3. Driver 再用 operator.add 将这 4 个局部和相加,得到最终总和
print(f"1到100的总和是: {
total_sum}") # 结果应该是 5050
spark.stop()
foreach(func): 对 RDD 中的每一个元素应用一个函数 func。这个动作通常用于需要对每个元素执行一个带有“副作用”(Side Effect)的操作,比如将数据写入数据库或发送到消息队列。需要注意的是,func 是在 Executor 端执行的,而不是在 Driver 端。
saveAsTextFile(path): 将 RDD 的内容保存为一个文本文件(或一组文件,每个分区一个文件)到指定的路径(如 HDFS 或本地文件系统)。
惰性求值的巨大优势:
自动优化: 因为 Spark 掌握了整个计算的蓝图(DAG),所以它可以在执行前进行大量的优化。例如,它可以将多个连续的 map 操作合并成一个单一的 map 操作(称为流水线操作 Pipelining),减少函数调用的开销。它可以重新安排操作的顺序,将 filter 操作尽可能地提前,以减少需要处理的数据量。
减少数据移动: Spark 会尽量推迟数据的移动(Shuffle),直到绝对必要的时候。如果没有惰性求值,每个宽依赖操作都会立即触发一次全量的数据混洗,而有了惰性求值,Spark 可以分析整个流程,看看是否能通过改变计划来避免或减少 Shuffle。
资源管理: 在一个 Action 被调用之前,Spark 应用实际上没有占用任何计算资源(除了 Driver 本身)。这使得资源管理器可以更有效地调度多个应用。
容错性: 如前所述,DAG 本身就是 RDD 容错机制的核心。
掌握了转换与动作的区别,你就可以开始用 Spark 的思维方式来思考问题:将一个复杂的数据处理任务分解成一系列的转换操作,构建起你的数据处理流水线,最后在需要最终结果的地方,用一个合适的动作操作来触发整个流程的执行。
2.3 键值对 RDD(Pair RDD)的威力与操作
虽然标准的 RDD 可以处理任何类型的 Python 对象,但在大规模数据处理中,一类特殊的 RDD——键值对 RDD(Pair RDD)——扮演着至关重要的角色。一个 Pair RDD 中的每个元素都是一个 (key, value) 形式的元组。几乎所有现实世界的数据聚合、分组和连接任务,都依赖于将数据转化为 Pair RDD 来进行操作。
Spark 为 Pair RDD 提供了一套专属的、高度优化的转换和动作操作,这些操作使得按键进行数据处理变得极其高效和便捷。
如何创建 Pair RDD:
创建 Pair RDD 最常见的方式是使用 map() 转换,将一个普通的 RDD 转换成 (key, value) 的形式。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CreatePairRDD").master("local").getOrCreate()
sc = spark.sparkContext
lines_rdd = sc.textFile("path/to/your/logfile.log") # 假设我们有一个日志文件
# 假设每行日志格式是: "TIMESTAMP LEVEL MESSAGE"
# 例如: "2023-10-27T10:00:00 ERROR Connection failed"
# 我们想按日志级别(LEVEL)进行统计,所以需要将日志级别作为 key
# 使用 map 将每行字符串转换为 (key, value) 对
# key 是日志级别,value 是 1 (用于后续计数)
log_level_rdd = lines_rdd.map(lambda line: (line.split(" ")[1], 1))
# lambda line: (line.split(" ")[1], 1) 会将 "2023-10-27T10:00:00 ERROR Connection failed"
# 转换为 ('ERROR', 1)
# 现在 log_level_rdd 就是一个 Pair RDD
# 我们可以调用它专属的操作,比如 reduceByKey
error_counts = log_level_rdd.reduceByKey(lambda a, b: a + b)
// 这会统计每个日志级别的出现次数
# 触发计算并查看结果
results = error_counts.collect()
print(results) # 可能会输出 [('ERROR', 150), ('INFO', 2000), ('WARN', 30)]
spark.stop()
核心的 Pair RDD 转换操作:
reduceByKey(func, [numPartitions]): 这是 Pair RDD 最重要、最高效的聚合操作。它对具有相同键的值应用一个归约函数 func。与 groupByKey 不同,reduceByKey 会在 Shuffle 数据到网络之前,在每个原始分区上进行一次本地的“预聚合”(Combiner)。这极大地减少了需要通过网络传输的数据量。
reduceByKey 的工作流程图解:
假设我们有两个分区,数据如下:
Partition 1: [('A', 1), ('B', 1), ('A', 1)]
Partition 2: [('A', 1), ('C', 1), ('B', 1)]
Map-Side Combine: reduceByKey 会先在每个分区内部对相同 key 的值进行聚合。
Partition 1 聚合后变为: [('A', 2), ('B', 1)]
Partition 2 聚合后变为: [('A', 1), ('C', 1), ('B', 1)]
Shuffle: 只有聚合后的结果才会被发送到网络上,进行 Shuffle。所有 ‘A’ 的中间结果被发送到一个节点,所有 ‘B’ 的到另一个节点,等等。
发往 ‘A’ 所在分区的数据: [('A', 2), ('A', 1)]
发往 ‘B’ 所在分区的数据: [('B', 1), ('B', 1)]
发往 ‘C’ 所在分区的数据: [('C', 1)]
Reduce: 在目标分区上,再次使用归约函数进行最终的聚合。
‘A’ 分区结果: ('A', 3)
‘B’ 分区结果: ('B', 2)
‘C’ 分区结果: ('C', 1)
groupByKey([numPartitions]): 按键对值进行分组。它返回一个 (key, iterable_of_values) 形式的 RDD。groupByKey 不会进行 Map-Side Combine。它会把所有具有相同键的值都通过网络传输到同一个节点,然后在那里将它们收集到一个迭代器中。如果某个键对应的值非常多,这可能会导致单个节点的内存溢出。因此,如果你能用 reduceByKey 实现的功能,就绝对不要用 groupByKey。
groupByKey 的工作流程图解 (同样的数据):
Partition 1: [('A', 1), ('B', 1), ('A', 1)]
Partition 2: [('A', 1), ('C', 1), ('B', 1)]
Shuffle: 所有原始的键值对都被发送到网络上。
发往 ‘A’ 所在分区的数据: [('A', 1), ('A', 1), ('A', 1)]
发往 ‘B’ 所在分区的数据: [('B', 1), ('B', 1)]
发往 ‘C’ 所在分区的数据: [('C', 1)]
Group: 在目标分区上,将值收集成迭代器。
‘A’ 分区结果: ('A', <iterator for [1, 1, 1]>)
‘B’ 分区结果: ('B', <iterator for [1, 1]>)
‘C’ 分区结果: ('C', <iterator for [1]>)
对比网络传输的数据量,reduceByKey 明显胜出。只有当你需要对一个键的所有值进行一些不能被增量归约的操作时(比如计算中位数),才应该考虑使用 groupByKey。
aggregateByKey(zeroValue, seqFunc, combFunc, [numPartitions]): 这是 reduceByKey 和 groupByKey 的“泛化”版本,也是最灵活的按键聚合函数。它允许你指定一个与输入值类型不同的输出聚合值类型。
zeroValue: 每个键的聚合“零值”或初始值。
seqFunc: 在每个分区内,用于将一个值合并到聚合器中的函数。
combFunc: 在不同分区之间,用于合并两个聚合器的函数。
示例:计算每个用户的平均购买金额
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AggregateByKeyExample").master("local").getOrCreate()
sc = spark.sparkContext
# (user_id, purchase_amount)
purchase_data = [('user1', 10.0), ('user2', 25.0), ('user1', 5.0), ('user2', 35.0), ('user1', 15.0)]
purchase_rdd = sc.parallelize(purchase_data)
# 我们想计算 (总金额, 总次数),最后再相除得到平均值
# 初始值是 (0.0, 0),即 (sum, count)
zero_value = (0.0, 0)
# seqFunc: 在分区内,如何将一个 purchase_amount 合并到 (sum, count) 累加器中
# (acc_sum, acc_count), value -> (acc_sum + value, acc_count + 1)
seq_func = (lambda acc, value: (acc[0] + value, acc[1] + 1))
# combFunc: 如何合并两个分区的 (sum, count) 累加器
# (acc1_sum, acc1_count), (acc2_sum, acc2_count) -> (acc1_sum + acc2_sum, acc1_count + acc2_count)
comb_func = (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
# 执行 aggregateByKey
sum_and_counts_rdd = purchase_rdd.aggregateByKey(zero_value, seq_func, comb_func)
# 结果是 (user_id, (total_sum, total_count))
# 例如:('user1', (30.0, 3)), ('user2', (60.0, 2))
# 最后,使用 mapValues 计算最终的平均值
# mapValues 是一个只对 Pair RDD 的 value 部分进行操作的转换,不会引起 Shuffle
average_rdd = sum_and_counts_rdd.mapValues(lambda sum_count: sum_count[0] / sum_count[1])
results = average_rdd.collect()
print(f"每个用户的平均购买金额: {
results}")
# 输出: [('user1', 10.0), ('user2', 30.0)]
spark.stop()
combineByKey(createCombiner, mergeValue, mergeCombiners): 这是 aggregateByKey 的底层实现,也是最底层的按键聚合 API,提供了最大的控制度。三者的关系是:combineByKey -> aggregateByKey -> reduceByKey。
join(other)、leftOuterJoin(other)、rightOuterJoin(other)、fullOuterJoin(other): 用于连接两个 Pair RDD。这些都是宽依赖操作。如果参与连接的两个 RDD 拥有相同的、已知的 Partitioner,Spark 就可以执行一次更高效的“协同定位连接”(Co-located Join),避免对其中一个 RDD 进行 Shuffle。
掌握 Pair RDD 的操作,尤其是深刻理解 reduceByKey, groupByKey, 和 aggregateByKey 之间的区别与联系,是编写高性能、可扩展的 PySpark 数据聚合程序的关键所在。
2.4 持久化之道:cache()、persist() 与 checkpoint() 的艺术
在 Spark 的惰性求值模型中,每个 RDD 都是通过其父 RDD 转换而来的。这意味着,每当一个动作(Action)被触发时,如果某个 RDD 被多次使用,Spark 默认会从头开始重新计算这个 RDD 及其所有的祖先 RDD。对于简单的计算链,这没有问题。但对于复杂的、尤其是迭代式的算法(如机器学习、图算法),反复地重算同一个中间 RDD 会带来毁灭性的性能灾难。
为了解决这个问题,Spark 提供了**持久化(Persistence)**机制,允许用户主动地告诉 Spark:“这个 RDD 很重要,我以后还要多次使用它,请你帮我把它计算一次后,将结果存储起来,以备后续重用。”
2.4.1 cache() 与 persist():将数据保留在计算节点
cache() 和 persist() 是最常用的持久化方法。它们的作用是将一个 RDD 的分区计算出来后,保存在 Executor 节点的内存或磁盘上。
cache(): 这是最简单直接的持久化方法。它实际上是 persist(StorageLevel.MEMORY_ONLY) 的一个别名。它告诉 Spark 将 RDD 的分区数据以反序列化的 Java 对象形式存储在 Executor 的 JVM 堆内存中。
persist(storageLevel): 这是一个更通用的方法,它允许你指定不同的存储级别(Storage Level),从而对持久化策略进行精细的控制。
存储级别(StorageLevel)的深度剖析:
pyspark.StorageLevel 类定义了多种存储策略,每种策略都是在速度、空间效率和可靠性之间做出的不同权衡。
| 存储级别 | 存储位置 | 空间占用 | CPU开销 | 可靠性 | 适用场景 |
|---|---|---|---|---|---|
MEMORY_ONLY |
JVM 内存 | 高 (原生对象) | 低 | 低 (Executor失败则丢失) | 默认选择,数据能完全放入内存时的最快选项。 |
MEMORY_ONLY_SER |
JVM 内存 | 中 (序列化后) | 中 (序列化/反序列化) | 低 | 当内存紧张,但仍希望数据在内存中时。 |
MEMORY_AND_DISK |
内存+磁盘 | 高 | 低 | 高 (内存不足时溢出到磁盘) | 最健壮的选择。速度快,且能处理大于内存的数据。 |
MEMORY_AND_DISK_SER |
内存+磁盘 | 中 | 中 | 高 | 在内存和磁盘都紧张时的健壮选择。 |
DISK_ONLY |
本地磁盘 | 中 | 高 (磁盘I/O+序列化) | 高 | RDD 非常大,且重算成本极高,但访问不频繁。 |
OFF_HEAP |
堆外内存 | 中 | 中 | 高 | 需要精细内存管理,避免 JVM GC 开销的场景。 |
此外,每种级别还有一个复制版本(如 MEMORY_ONLY_2, MEMORY_AND_DISK_2),它们会将每个分区同时存储在两个不同的节点上。这提供了更高的容错性(一个节点挂掉,数据副本仍在),但代价是双倍的存储空间和网络开销。
cache() 的实战:迭代算法性能优化
让我们以一个简化的迭代算法为例,直观地感受 cache() 带来的性能提升。假设我们想通过多次迭代来“增强”一组数值。
import time
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
spark = SparkSession.builder.appName("CacheExample").master("local[2]").getOrCreate()
sc = spark.sparkContext
# 创建一个 RDD
initial_rdd = sc.parallelize(range(10_000_000), 8) # 一个有 1000 万个数字的 RDD
def enhance_data(rdd):
"""一个模拟的、计算开销较大的转换函数"""
time.sleep(0.5) # 模拟每个分区的复杂计算
return rdd.map(lambda x: x * 1.01 + 0.5)
# --- 场景一:不使用 cache() ---
print("--- 开始执行迭代计算 (不使用 cache) ---")
start_time_no_cache = time.time()
# 假设这是一个迭代算法,需要多次使用同一个 RDD
current_rdd = initial_rdd
for i in range(5):
# 对 current_rdd 进行一些转换
enhanced = enhance_data(current_rdd)
# 动作:count() 触发计算。每次循环,enhanced RDD 都会从 initial_rdd 开始重新计算
count_result = enhanced.count()
print(f"迭代 {
i+1}: 元素数量 {
count_result}")
# 在这个例子中,我们没有更新 current_rdd,只是为了演示对它的重复使用
# 在真实算法中,可能是 current_rdd = some_function(current_rdd, other_data)
end_time_no_cache = time.time()
print(f"不使用 cache 的总耗时: {
end_time_no_cache - start_time_no_cache:.2f} 秒
")
# --- 场景二:使用 cache() ---
print("--- 开始执行迭代计算 (使用 cache) ---")
# 在第一次转换后,对需要重复使用的 RDD 进行缓存
# cache() 本身也是一个转换操作,是惰性的。它只标记了 RDD 需要被缓存。
cached_initial_rdd = initial_rdd.cache()
# 必须有第一个动作来触发计算和缓存的填充
first_count = cached_initial_rdd.count()
# 这个 count() 会触发 initial_rdd 的计算,并将结果填充到 Executor 的内存中
print(f"第一次触发动作,完成缓存填充,元素数量: {
first_count}")
start_time_with_cache = time.time()
current_rdd = cached_initial_rdd
for i in range(5):
enhanced = enhance_data(current_rdd)
# 动作:count()。这次,enhanced 的计算会直接从内存中读取 cached_initial_rdd 的数据,
# 而不是从头开始计算。
count_result = enhanced.count()
print(f"迭代 {
i+1}: 元素数量 {
count_result}")
end_time_with_cache = time.time()
print(f"使用 cache 的总耗时 (不含首次填充): {
end_time_with_cache - start_time_with_cache:.2f} 秒")
# 使用完毕后,可以手动释放缓存以节约内存
cached_initial_rdd.unpersist()
print("
缓存已释放。")
spark.stop()
在这个例子中,不使用 cache() 的版本,每次循环都会重新计算 initial_rdd,总耗时大约是 5 次 enhance_data 的时间。而使用 cache() 的版本,只有第一次 count() 会触发完整的计算,后续的循环都直接从内存读取数据,速度会快得多。
何时以及如何使用 persist()?
黄金法则: 当你准备对一个 RDD 执行超过一次动作时,就应该考虑对其进行持久化。
选择存储级别: 始终从 persist(StorageLevel.MEMORY_AND_DISK) 开始。这是一个安全且高效的默认选项。只有当你非常确定数据能放入内存,并且对性能要求极致时,才使用 MEMORY_ONLY。当你发现内存成为瓶颈,并且 RDD 中有大量冗余信息时,可以尝试 _SER 版本。
unpersist(): 当你确定不再需要一个持久化的 RDD 时,调用 rdd.unpersist() 是一个好习惯。这会告诉 Spark 释放它占用的内存和磁盘空间,为后续的计算腾出资源。
2.4.2 checkpoint():为超长血缘图谱设置“存档点”
cache() 和 persist() 解决了重算问题,但它们有一个共同的特点:它们不会切断 RDD 的血缘关系(Lineage)。这意味着,即使 RDD 被缓存了,Spark 仍然保留着计算出它的完整 DAG。如果一个 Executor 节点崩溃,它上面缓存的分区就会丢失。Spark 会利用血缘关系,从父 RDD 开始重新计算丢失的分区来恢复数据。
但是,当一个 RDD 的血缘图谱变得极度长且复杂时(例如,经过了上百次转换),这种依赖血缘的容错机制也会成为一个问题:
恢复时间过长: 一旦有分区丢失,重算的链条会非常长,耗时很久。
Driver 内存压力: Driver 需要在内存中维护整个 DAG。过长的 DAG 可能会导致 Driver 本身出现 StackOverflowError。
为了解决这个问题,Spark 提供了 checkpoint()。Checkpoint 是一种更“强硬”的持久化。它的工作机制是:
将 RDD 的数据完整地计算出来。
将计算结果保存到一个可靠的、支持容错的分布式文件系统中(通常是 HDFS)。
切断并丢弃该 RDD 之前的所有血缘关系图谱。这个被 checkpoint 的 RDD,现在成为了一个新的、没有父 RDD 的“根”RDD。
checkpoint() 的使用方法与最佳实践:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CheckpointExample").master("local").getOrCreate()
sc = spark.sparkContext
# 1. 设置 Checkpoint 目录
# 必须在使用 checkpoint() 之前设置一个目录,用于存放 checkpoint 文件
# 这个目录必须是 HDFS 或其他可靠的文件系统路径
# 这里为了本地演示,我们使用一个本地目录
import tempfile
checkpoint_dir = tempfile.mkdtemp()
sc.setCheckpointDir(checkpoint_dir)
print(f"Checkpoint 目录设置为: {
checkpoint_dir}")
# 创建一个 RDD,并经过一系列非常复杂的转换
rdd = sc.parallelize(range(100), 4)
complex_rdd = rdd.map(lambda x: (x % 10, x*x))
.reduceByKey(lambda a, b: a + b)
.flatMap(lambda kv: [kv[1]] * kv[0])
.filter(lambda x: x > 1000)
# 假设这是一个计算成本很高的 RDD,我们不想在每次使用时都重算
# 2. 对 RDD 调用 checkpoint()
# checkpoint() 和 cache() 一样,也是一个惰性操作
complex_rdd.checkpoint()
# 3. 关键!必须触发一个动作来实际执行 checkpoint
# 如果不调用动作,checkpoint 文件是不会被创建的!
# 这是一个非常容易犯的错误。
checkpoint_result = complex_rdd.count()
print(f"第一次触发动作,实际执行了 checkpoint,结果数量: {
checkpoint_result}")
# 4. 验证 checkpoint 是否生效
# isCheckpointed() 方法可以检查一个 RDD 是否已经被 checkpoint
print(f"complex_rdd 是否已 checkpointed: {
complex_rdd.isCheckpointed()}")
# toDebugString() 可以查看 RDD 的血缘关系
# 你会发现,checkpointed 之后,它的依赖关系被切断了
print(f"complex_rdd 的血缘关系:
{
complex_rdd.toDebugString().decode()}")
# 5. 再次使用该 RDD
# 这次它会直接从 checkpoint 目录中读取数据,而不是重算
another_result = complex_rdd.sum()
print(f"再次使用 RDD,计算总和: {
another_result}")
# **最佳实践:cache() 与 checkpoint() 联用**
# 问题:单独调用 checkpoint(),RDD 会被计算两次。
# 第一次:当触发动作时,Spark 从头计算 RDD。
# 第二次:Spark 发现这个 RDD 需要被 checkpoint,于是它启动一个独立的作业,再次从头计算一遍 RDD,并将结果写入 checkpoint 目录。
# 解决方案:
best_practice_rdd = complex_rdd.cache() # 先标记为缓存
best_practice_rdd.checkpoint() # 再标记为 checkpoint
# 然后触发动作
best_practice_rdd.count()
# 工作流程:
# 1. Spark 计算 RDD,因为需要被 cache,所以将结果放入内存。
# 2. Spark 发现 RDD 也需要被 checkpoint,它会直接从内存中读取缓存的数据,然后写入 checkpoint 目录。
# 这样就避免了第二次重算。
spark.stop()
persist() vs. checkpoint() 的终极对比
| 特性 | persist() / cache() |
checkpoint() |
|---|---|---|
| 存储位置 | Executor 的内存和本地磁盘。 | 可靠的分布式存储(如 HDFS)。 |
| 生命周期 | Spark 应用结束时,缓存数据被清除。 | 默认情况下,应用结束后 checkpoint 文件仍然存在,需要手动清理。 |
| 血缘关系 | 保留血缘关系。 | 切断并丢弃血缘关系。 |
| 容错方式 | 依赖血缘关系进行重算。 | 直接从 checkpoint 文件中读取数据。 |
| 执行速度 | 快,主要是内存操作。 | 慢,涉及网络和磁盘 I/O 到 HDFS。 |
| 触发方式 | 第一个动作触发计算和缓存填充。 | 惰性,需要动作触发。单独使用会计算两次,与 cache() 联用是最佳实践。 |
| 核心用途 | 加速对同一 RDD 的迭代式访问。 | 为极长血缘关系的 RDD 提供容错“存档点”,防止 Driver 崩溃和过长的恢复时间。 |
在绝大多数情况下,cache() 或 persist(StorageLevel.MEMORY_AND_DISK) 是你需要的工具。只有当你明确地意识到 RDD 的血缘关系已经长到可能引发问题时,才需要动用 checkpoint() 这个“重型武器”。
第三部分:PySpark SQL 与 DataFrame – 结构化数据分析的利器
如果说 RDD 是 Spark 的“汇编语言”,提供了最底层、最灵活的分布式操作能力,那么 DataFrame API 就是 Spark 的“高级语言”(如 Python 或 Java)。它为处理结构化和半结构化数据带来了前所未有的简洁性、高性能和可优化性。对于绝大多数数据分析和 ETL 任务,DataFrame 都是比 RDD 更好的选择。本部分将深入探索 DataFrame 的世界,从其设计理念到高级应用,揭示其高性能背后的秘密。
第三章:从 RDD 到 DataFrame:一场结构化的革命
3.1 DataFrame 是什么:超越 RDD 的“模式”与“优化”
DataFrame,从概念上讲,可以被看作是一个带有“模式”(Schema)的、不可变的分布式数据集,其数据被组织成一系列命名的列(Columns)。如果你熟悉 pandas 的 DataFrame 或者关系型数据库中的表,那么 Spark 的 DataFrame 会让你感到非常亲切。它就像一张分布在成百上千台机器上的巨大表格。
但 DataFrame 绝不仅仅是“带列名的 RDD”。它的出现,是 Spark 演进过程中的一次范式转移,带来了两大革命性的优势:
模式(Schema)的引入:
RDD 的局限性: RDD 是“类型无知”的。对于 Spark 来说,一个 RDD sc.parallelize([(1, "Alice"), (2, "Bob")]) 内部只是一堆无差别的 Java 对象(在 PySpark 中是序列化后的 Python 对象)。Spark 不知道第一个元素是整数 id,第二个元素是字符串 name。这意味着,当你执行 rdd.map(lambda t: t[0] + 1) 这样的操作时,其合法性检查只能在运行时(Runtime)进行,并且 Spark 无法利用数据类型信息进行优化。
DataFrame 的优势: DataFrame 强制要求数据必须有关联的模式。模式定义了每一列的名称(如 “id”, “name”)和数据类型(如 IntegerType, StringType)。这个模式信息为 Spark 提供了关于数据结构的丰富元数据。
Catalyst 优化器与 Tungsten 执行引擎:
查询优化: 有了模式,Spark SQL 的核心——Catalyst 优化器——就有了大展拳脚的空间。当你使用 DataFrame API(或纯 SQL 语句)构建一个查询时,Catalyst 会将你的查询逻辑转换成一个抽象语法树(AST),然后应用一系列复杂的、基于规则的优化策略(如谓词下推、列裁剪、常量折叠等),最终生成一个最优化的物理执行计划。这个过程对用户是完全透明的。
高效执行: 生成的物理计划最终会在 Tungsten 执行引擎上运行。Tungsten 是 Spark 的物理执行后端,它通过一系列底层技术(如直接操作二进制数据、避免 JVM 对象开销、为现代 CPU 生成优化代码等)极大地提升了执行效率。
总结来说,RDD 和 DataFrame 的核心区别在于:
RDD API (rdd.map, rdd.filter): 你告诉 Spark “如何做”(How)。你的代码直接定义了操作的执行逻辑。
DataFrame API (df.select, df.where): 你告诉 Spark “做什么”(What)。你声明性地描述你想要的结果,而由 Catalyst 优化器去决定**“如何以最优的方式去做”**。
这种从“命令式”到“声明式”的转变,将大量的性能优化工作从开发者手中解放出来,交给了 Spark 引擎本身,使得即便是没有深厚分布式系统背景的分析师和开发者,也能编写出高性能的分布式代码。
创建 DataFrame:多种数据源的统一入口
SparkSession 对象是创建 DataFrame 的统一入口。它的 read 属性提供了一个 DataFrameReader 接口,可以从多种数据源加载数据并自动推断或指定模式。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# 创建 SparkSession
spark = SparkSession.builder
.appName("DataFrameCreation")
.master("local")
.getOrCreate()
# --- 方式一:从已有 RDD 创建 ---
sc = spark.sparkContext
rdd = sc.parallelize([("Alice", 30), ("Bob", 25), ("Charlie", 35)])
# 1a. toDF() 自动推断模式
df_from_rdd1 = rdd.toDF(["name", "age"])
// toDF() 是一个便捷方法,可以为列指定名称
print("--- 从 RDD 创建 (自动推断模式) ---")
df_from_rdd1.printSchema() # 打印模式信息
df_from_rdd1.show() # show() 是一个动作,用于在控制台打印 DataFrame 的内容
# 1b. createDataFrame() 指定完整模式
schema = StructType([
StructField("name", StringType(), True), # 字段名,字段类型,是否可为空
StructField("age", IntegerType(), True)
])
df_from_rdd2 = spark.createDataFrame(rdd, schema)
print("
--- 从 RDD 创建 (指定完整模式) ---")
df_from_rdd2.printSchema()
df_from_rdd2.show()
# --- 方式二:从外部数据源读取 (最常见的方式) ---
# Spark 支持多种格式:csv, json, parquet, orc, jdbc, text 等
# 我们创建一个临时的 CSV 文件来演示
with open("people.csv", "w") as f:
f.write("name,age,height
")
f.write("David,42,1.85
")
f.write("Eva,29,1.68
")
f.write("Frank,,1.75
") # 注意这里有一个缺失的 age 值
# 2a. 读取 CSV,自动推断模式和头部
df_from_csv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("people.csv")
// .option("header", "true") 告诉 Spark 文件第一行是列名
// .option("inferSchema", "true") 告诉 Spark 自动扫描数据来推断每列的类型。
// 注意:inferSchema 需要额外的一次数据扫描,对于大文件可能会有性能开销。
print("
--- 从 CSV 读取 (自动推断模式) ---")
df_from_csv.printSchema() # age 会被推断为 integer, height 为 double
df_from_csv.show()
# 2b. 读取 CSV,手动指定模式 (生产环境推荐)
csv_schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("height", DoubleType(), True)
])
df_from_csv_with_schema = spark.read
.option("header", "true")
.schema(csv_schema)
.csv("people.csv")
print("
--- 从 CSV 读取 (手动指定模式) ---")
df_from_csv_with_schema.printSchema()
df_from_csv_with_schema.show()
# 你也可以用一行链式调用完成
# df = spark.read.csv("path", header=True, inferSchema=True)
# --- 方式三:从 Python 列表或 pandas DataFrame 创建 ---
python_list = [("Grace", 55), ("Heidi", 23)]
df_from_list = spark.createDataFrame(python_list, ["name", "age"])
print("
--- 从 Python 列表创建 ---")
df_from_list.show()
# 停止 SparkSession
spark.stop()
3.2 核心操作(一):选择、过滤与添加列 (select, filter/where, withColumn)
DataFrame API 的设计是声明式的,其核心操作与 SQL 的关键字非常相似,这使得有 SQL 基础的用户可以快速上手。所有的 DataFrame 操作也都是惰性的转换操作,它们会返回一个新的 DataFrame,直到一个动作(如 show(), count(), collect())被调用时才会触发计算。
select(*cols): 选择列
select() 用于从 DataFrame 中选择一个或多个列,其作用类似于 SQL 中的 SELECT 子句。它的参数可以是列名字符串,也可以是 Column 对象。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit # 导入 col 和 lit 函数
spark = SparkSession.builder.appName("SelectOperations").getOrCreate()
# 准备数据
data = [
("James", "Smith", "USA", "CA", 3000),
("Michael", "Rose", "USA", "NY", 4000),
("Robert", "Williams", "USA", "CA", 4000),
("Maria", "Jones", "USA", "FL", 5000)
]
columns = ["firstname", "lastname", "country", "state", "salary"]
df = spark.createDataFrame(data, columns)
print("原始 DataFrame:")
df.show()
# --- 方式一:使用列名字符串 ---
# 选择 firstname 和 salary 两列
df.select("firstname", "salary").show()
# --- 方式二:使用 col() 函数获取 Column 对象 (推荐) ---
# col() 函数使得我们可以对列进行操作
df.select(col("firstname"), col("lastname")).show()
# --- 方式三:从 DataFrame 对象本身访问列 ---
df.select(df.firstname, df.salary).show()
df.select(df["firstname"], df["salary"]).show() # 两种等价的访问方式
# --- 对列进行操作和重命名 ---
# 计算每个人的年薪,并给新列起一个别名
# lit() 函数用于创建一个常量值的列
df.select(
col("firstname"),
col("salary"),
(col("salary") * 12).alias("annual_salary"), # 对 salary 列进行计算,并使用 alias() 重命名
lit("USD").alias("currency") # 使用 lit() 创建一个内容全为 "USD" 的新列
).show()
filter(condition) 或 where(condition): 过滤行
filter() 和 where() 的功能完全相同(where 是 filter 的别名),它们根据给定的条件过滤 DataFrame 中的行,类似于 SQL 中的 WHERE 子句。条件表达式是一个返回布尔值的 Column 对象。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("FilterOperations").getOrCreate()
df = spark.createDataFrame(data, columns) # 使用上一节的数据
print("原始 DataFrame:")
df.show()
# --- 场景一:简单条件 ---
# 选择薪水大于 4000 的员工
df.filter(col("salary") > 4000).show()
# 等价于 df.where(df["salary"] > 4000).show()
# --- 场景二:多重条件 (AND) ---
# 选择在 'CA' 州且薪水等于 4000 的员工
# 使用 & 操作符连接条件
df.filter((col("state") == "CA") & (col("salary") == 4000)).show()
# --- 场景三:多重条件 (OR) ---
# 选择在 'NY' 州或 'FL' 州的员工
# 使用 | 操作符连接条件
df.filter((col("state") == "NY") | (col("state") == "FL")).show()
# --- 场景四:使用 SQL 表达式字符串 ---
# 这种方式更灵活,但可能失去一些编译时类型检查
df.filter("salary > 4000").show()
df.where("state = 'CA' AND salary = 4000").show()
# --- 场景五:复杂的条件组合 ---
# 选择薪水小于等于 4000 并且 (来自 CA 州或 NY 州) 的员工
df.filter(
(col("salary") <= 4000) &
(col("state").isin("CA", "NY")) # isin() 用于检查列的值是否在给定的集合中
).show()
withColumn(colName, col): 添加或替换列
withColumn() 是一个极其强大的转换操作,用于向 DataFrame 添加一个新列,或者替换一个同名的现有列。
colName (string): 新列的名称。
col (Column): 定义新列如何计算的 Column 对象。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, concat_ws
spark = SparkSession.builder.appName("WithColumnOperations").getOrCreate()
df = spark.createDataFrame(data, columns) # 使用之前的数据
print("原始 DataFrame:")
df.show()
# --- 场景一:添加一个常量列 ---
df_with_country = df.withColumn("country_code", lit("US"))
// 添加一个名为 country_code 的新列,其值全部为常量 "US"
df_with_country.show()
# --- 场景二:基于现有列计算新列 ---
df_with_bonus = df.withColumn("salary_with_bonus", col("salary") * 1.1)
// 添加一个名为 salary_with_bonus 的新列,其值为原 salary 的 1.1 倍
df_with_bonus.show()
# --- 场景三:替换一个现有列 ---
# 假设我们要将 salary 统一增加 500
df_updated_salary = df.withColumn("salary", col("salary") + 500)
// "salary" 列已存在,所以这个操作会用新计算的值替换它
df_updated_salary.show()
# --- 场景四:使用条件逻辑 (CASE WHEN) 添加新列 ---
# 使用 when(condition, value).otherwise(other_value) 来实现 SQL 的 CASE WHEN 逻辑
df_with_grade = df.withColumn("salary_grade",
when(col("salary") >= 5000, "High")
.when(col("salary") >= 4000, "Medium")
.otherwise("Low") # otherwise() 相当于 ELSE
)
// 根据 salary 的值给员工划分等级
df_with_grade.show()
# --- 场景五:组合多个列 ---
# 使用 concat_ws(separator, *cols) 函数将多个列合并成一个字符串
df_with_fullname = df.withColumn("full_name", concat_ws(" ", col("firstname"), col("lastname")))
// 创建一个 full_name 列,内容是 firstname 和 lastname 以空格连接
df_with_fullname.show()
withColumnRenamed(existingName, newName): 重命名列
这是一个简单的转换,用于更改现有列的名称。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RenameColumn").getOrCreate()
df = spark.createDataFrame(data, columns)
# 将 "salary" 列重命名为 "monthly_income"
df_renamed = df.withColumnRenamed("salary", "monthly_income")
df_renamed.printSchema()
df_renamed.show()
通过组合使用 select, filter, withColumn,我们就可以完成绝大多数数据清洗和特征工程中的列操作和行过滤任务。这些操作共同构成了 DataFrame 数据处理流程的核心骨架。
3.3 核心操作(二):聚合、分组与排序 (groupBy, agg, orderBy)
数据分析的核心任务之一就是聚合(Aggregation)——将海量的数据行总结成有意义的统计指标。Spark DataFrame 提供了强大且灵活的 API 来执行分组聚合操作,其核心是 groupBy() 方法和 agg() 方法。
groupBy(*cols) 或 groupby(*cols): 按列分组
groupBy() 方法根据一个或多个列对 DataFrame 进行分组,它返回一个 GroupedData 对象。GroupedData 对象本身不是一个 DataFrame,你必须对它调用一个聚合函数(如 count(), sum(), avg())来计算每个组的统计值,从而得到一个新的 DataFrame。
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, sum, avg, min, max, countDistinct
spark = SparkSession.builder.appName("GroupByOperations").getOrCreate()
# 准备数据
data_agg = [
("James", "Sales", "NY", 90000, 34, 10000),
("Michael", "Sales", "NY", 86000, 56, 20000),
("Robert", "Sales", "CA", 81000, 30, 23000),
("Maria", "Finance", "CA", 90000, 24, 23000),
("Raman", "Finance", "CA", 99000, 40, 24000),
("Scott", "Finance", "NY", 83000, 36, 19000),
("Jen", "Finance", "NY", 79000, 53, 15000),
("Jeff", "Marketing", "CA", 80000, 25, 18000),
("Kumar", "Marketing", "NY", 91000, 50, 21000)
]
columns_agg = ["name", "department", "state", "salary", "age", "bonus"]
df = spark.createDataFrame(data=data_agg, schema=columns_agg)
print("原始 DataFrame:")
df.show()
# --- 场景一:单列分组和单个聚合 ---
# 按 department 分组,计算每个部门的平均工资
df.groupBy("department").avg("salary").show()
// .avg("salary") 是 GroupedData 对象上的一个聚合方法
// 结果 DataFrame 会有两列:'department' 和 'avg(salary)'
# 按 department 分组,计算每个部门的员工人数
df.groupBy("department").count().show()
// .count() 是最常用的聚合方法之一
# --- 场景二:多列分组 ---
# 同时按 department 和 state 分组,计算每个部门在每个州的最高薪水
df.groupBy("department", "state").max("salary").show()
// 分组的键是 (department, state) 组合
agg(*exprs): 执行多个聚合
当你需要对每个分组计算多个聚合指标时,agg() 方法就派上用场了。它接收一个或多个聚合表达式作为参数。
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, max, count # 导入需要的聚合函数
spark = SparkSession.builder.appName("AggOperations").getOrCreate()
df = spark.createDataFrame(data=data_agg, schema=columns_agg)
# --- 场景一:对同一个分组计算多个聚合 ---
# 按 department 分组,同时计算总工资、平均工资和最高工资
df.groupBy("department").agg(
sum("salary").alias("total_salary"), # 使用 alias() 给聚合结果列重命名,这是一个好习惯
avg("salary").alias("average_salary"),
max("salary").alias("max_salary")
).show()
# --- 场景二:对不同列执行不同聚合 ---
# agg() 的参数也可以是一个字典,key 是要聚合的列名,value 是聚合函数名字符串
df.groupBy("department").agg(
{
'salary': 'avg', 'bonus': 'sum'}
).show()
// 这个语法更简洁,但不够灵活(比如不能直接重命名)
# --- 场景三:对所有列执行聚合 ---
# 在没有 groupBy 的情况下调用 agg(),会对整个 DataFrame 进行聚合
df.agg(
sum("salary").alias("total_salary_for_company"),
count("*").alias("total_employees") # count("*") 或 count(lit(1)) 用于计算总行数
).show()
orderBy(*cols, **kwargs) 或 sort(*cols, **kwargs): 排序
orderBy() 和 sort() 的功能完全相同,用于根据一个或多个列对 DataFrame 的行进行排序。默认是升序。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, asc, desc
spark = SparkSession.builder.appName("OrderByOperations").getOrCreate()
df = spark.createDataFrame(data=data_agg, schema=columns_agg)
print("原始 DataFrame:")
df.show()
# --- 场景一:单列排序 ---
# 按薪水升序排序 (默认)
df.orderBy("salary").show()
# 显式指定升序
df.orderBy(col("salary").asc()).show()
# --- 场景二:单列降序排序 ---
df.orderBy(col("salary").desc()).show()
# 或者 df.sort(df.salary.desc()).show()
# --- 场景三:多列排序 ---
# 先按 department 升序排序,如果 department 相同,再按 salary 降序排序
df.orderBy(col("department").asc(), col("salary").desc()).show()
# 等价于 df.sort("department", "salary", ascending=[1, 0]) # 1 for asc, 0 for desc
# --- 组合使用:分组聚合后排序 ---
# 先计算每个部门的平均工资,然后按平均工资降序排列,找出薪资最高的部门
df.groupBy("department")
.agg(avg("salary").alias("avg_salary"))
.orderBy(col("avg_salary").desc())
.show()
groupBy, agg, 和 orderBy 的组合是数据分析工作的核心。通过链式调用它们,你可以构建出强大的、富有表现力的数据聚合管道,从原始数据中提取出高价值的洞察。理解并熟练运用这些操作,是掌握 DataFrame 数据分析的关键。
3.4 连接操作:join 的艺术与策略
在真实世界的数据分析项目中,数据很少会整齐地存放在一张大表中。通常,信息会分散在多个表中,我们需要通过**连接(Join)**操作将它们关联起来,形成一个更宽、信息更丰富的视图。Spark DataFrame 提供了强大而灵活的 join API,支持关系型数据库中所有标准的连接类型,并针对分布式环境进行了深度优化。
join 是一个转换操作,它将两个 DataFrame 根据一个或多个连接键(Join Key)组合成一个新的 DataFrame。
join 的基本语法:
df1.join(df2, join_expression, join_type)
df1: 左侧的 DataFrame。
df2: 右侧的 DataFrame。
join_expression: 连接条件。它可以是一个列名字符串(当左右 DataFrame 连接键列名相同时),一个列名列表(用于多列连接),或者是一个复杂的列表达式。
join_type: 连接类型。一个字符串,指定连接的方式。如果省略,默认为 "inner"。
连接类型(Join Types)深度解析:
| Join Type | 描述 | SQL 等价物 |
|---|---|---|
"inner" |
内连接: 只返回在两个 DataFrame 中连接键都能匹配上的行。 | INNER JOIN |
"outer", "full", "full_outer" |
全外连接: 返回两个 DataFrame 中的所有行。如果某一行在一个 DataFrame 中没有匹配项,则另一个 DataFrame 的列将填充为 null。 |
FULL OUTER JOIN |
"left", "left_outer" |
左外连接: 返回左侧 DataFrame 的所有行,以及右侧 DataFrame 中能匹配上的行。如果左侧的某行在右侧没有匹配项,则右侧的列将填充为 null。 |
LEFT OUTER JOIN |
"right", "right_outer" |
右外连接: 返回右侧 DataFrame 的所有行,以及左侧 DataFrame 中能匹配上的行。如果右侧的某行在左侧没有匹配项,则左侧的列将填充为 null。 |
RIGHT OUTER JOIN |
"left_semi" |
左半连接: 只返回左侧 DataFrame 中,那些在右侧 DataFrame 中存在匹配连接键的行。结果中不包含右侧 DataFrame 的任何列。 | ... WHERE column IN (SELECT column FROM ...) |
"left_anti" |
左反连接: 只返回左侧 DataFrame 中,那些在右侧 DataFrame 中不存在匹配连接键的行。结果中也不包含右侧 DataFrame 的任何列。 | ... WHERE column NOT IN (SELECT column FROM ...) |
"cross" |
交叉连接 (笛卡尔积): 返回左侧 DataFrame 的每一行与右侧 DataFrame 的每一行的所有可能组合。这是一个极其昂贵的操作,应极力避免在大型数据集上使用。 | CROSS JOIN |
实战代码示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JoinOperations").getOrCreate()
# 准备两个 DataFrame:员工信息和部门信息
emp_data = [
(1, "Alice", 10), (2, "Bob", 20),
(3, "Charlie", 10), (4, "David", None) # David 没有部门
]
emp_columns = ["emp_id", "emp_name", "dept_id"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_columns)
dept_data = [(10, "Sales"), (20, "HR"), (30, "IT")] # IT 部门目前没有员工
dept_columns = ["dept_id", "dept_name"]
dept_df = spark.createDataFrame(data=dept_data, schema=dept_columns)
print("--- 员工 DataFrame (emp_df) ---")
emp_df.show()
print("--- 部门 DataFrame (dept_df) ---")
dept_df.show()
# --- 内连接 (Inner Join) ---
# 连接两个 df,连接键是 'dept_id'
# 注意:因为两个 df 中都有 'dept_id' 列,连接后的结果会有两列重复的 'dept_id'
print("
--- 内连接 (Inner Join) ---")
emp_df.join(dept_df, emp_df.dept_id == dept_df.dept_id, "inner").show()
# David (dept_id is null) 和 IT 部门 (没有员工匹配) 都不会出现在结果中
# 解决重复列问题的更好写法
print("
--- 内连接 (简洁写法) ---")
emp_df.join(dept_df, on="dept_id", how="inner").show()
# 当连接键列名相同时,可以直接使用 on="col_name" 的语法,结果中只会保留一列连接键
# --- 全外连接 (Full Outer Join) ---
print("
--- 全外连接 (Full Outer Join) ---")
emp_df.join(dept_df, on="dept_id", how="outer").show()
# David 和 IT 部门都会出现在结果中,对应的另一方列为 null
# --- 左外连接 (Left Outer Join) ---
print("
--- 左外连接 (Left Outer Join) ---")
emp_df.join(dept_df, on="dept_id", how="left").show()
# 所有员工都会出现。David 对应的 dept_name 为 null
# --- 右外连接 (Right Outer Join) ---
print("
--- 右外连接 (Right Outer Join) ---")
emp_df.join(dept_df, on="dept_id", how="right").show()
# 所有部门都会出现。IT 部门对应的员工信息为 null
# --- 左半连接 (Left Semi Join) ---
# 找出所有 "有部门" 的员工信息
print("
--- 左半连接 (Left Semi Join) ---")
emp_df.join(dept_df, on="dept_id", how="left_semi").show()
# 结果只包含 emp_df 的列,并且 David 不在其中
# --- 左反连接 (Left Anti Join) ---
# 找出所有 "没有部门" 或 "部门信息不存在" 的员工
print("
--- 左反连接 (Left Anti Join) ---")
emp_df.join(dept_df, on="dept_id", how="left_anti").show()
# 结果只包含 emp_df 的列,并且只有 David 在其中
# --- 处理连接键列名不同的情况 ---
# 假设部门 df 的列名是 'd_id' 和 'd_name'
dept_df_new_names = dept_df.withColumnRenamed("dept_id", "d_id")
.withColumnRenamed("dept_name", "d_name")
print("
--- 新的部门 DataFrame ---")
dept_df_new_names.show()
print("
--- 连接键列名不同时的连接 ---")
emp_df.join(dept_df_new_names, emp_df.dept_id == dept_df_new_names.d_id, "inner").show()
# 这种情况下必须使用显式的列表达式 `df1.col == df2.col`
# 结果中会包含 dept_id 和 d_id 两列
spark.stop()
连接策略与性能考量:
在分布式环境中,join 是一个典型的会引发大规模数据混洗(Shuffle)的操作,因此其性能至关重要。Spark 的 Catalyst 优化器会自动根据数据的大小、统计信息和集群状态选择最优的连接策略。了解这些策略有助于我们编写更高效的连接查询。
Shuffle Sort Merge Join: 这是最通用、最稳健的连接策略。
工作流程:
Shuffle 阶段: 对两个 DataFrame,都根据连接键进行重新分区(使用哈希分区器),确保具有相同连接键的行被发送到同一个 Executor 节点。
Sort 阶段: 在每个 Executor 节点内部,对接收到的数据按连接键进行排序。
Merge 阶段: 对两个已排序的数据集进行“归并”操作,像拉拉链一样将匹配的行连接起来。
优点: 可靠,能处理任意大小的数据,内存占用相对固定。
缺点: 涉及 Shuffle 和排序,开销较大。
Broadcast Hash Join (Map-Side Join): 这是最高效的连接策略,但有其适用条件。
适用条件: 其中一个 DataFrame 非常小,小到可以完全加载到每个 Executor 的内存中(通常建议小于几十 MB,可通过 spark.sql.autoBroadcastJoinThreshold 配置阈值)。
工作流程:
Broadcast 阶段: 将小 DataFrame 的所有数据收集到 Driver,然后由 Driver 广播(Broadcast)到集群中的每一个 Executor。
Hash Join 阶段: 在每个 Executor 上,大 DataFrame 的每个分区可以直接与内存中的小 DataFrame(通常构建成一个哈希表)进行连接,完全不需要 Shuffle 大的 DataFrame。
优点: 避免了对大表的 Shuffle,速度极快。
缺点: 对小表的大小有严格限制,如果广播的表过大,可能导致 Executor 内存溢出或网络拥塞。
from pyspark.sql.functions import broadcast
# 假设 dept_df 非常小
# 我们可以使用 broadcast() 函数给 Catalyst 优化器一个强烈的“暗示”
# 告诉它我们希望使用广播连接
print("
--- 广播连接 (Broadcast Join) ---")
emp_df.join(broadcast(dept_df), on="dept_id", how="inner").show()
# 可以通过 .explain() 查看执行计划来确认是否使用了 BroadcastHashJoin
emp_df.join(broadcast(dept_df), on="dept_id", how="inner").explain()
选择正确的连接类型和策略,是优化 Spark 作业性能的关键一步。
数据过滤优先: 在 join 之前,尽可能地使用 filter 或 where 对两个 DataFrame 进行过滤,减少需要参与连接的数据量。
善用半连接和反连接: 如果你只关心是否存在匹配,而不需要右表的列,left_semi 和 left_anti 远比先 join 再 select 或 filter 高效,因为它们能极大地减少数据移动。
识别广播机会: 在“事实表”与“维度表”的连接场景中,维度表通常很小,是使用广播连接的绝佳机会。主动使用 broadcast() 函数可以确保 Spark 选择最优策略。
第四部分:深入 PySpark SQL 与高级功能
在掌握了 DataFrame 的基本操作之后,我们将进一步探索 PySpark 提供的更高级的功能,包括直接运行 SQL 查询、使用强大的内置函数库、处理复杂数据类型以及定义用户自己的函数(UDF)。这些工具将使我们能够应对更复杂、更精细的数据分析挑战。
第四章:释放 SQL 的力量:临时视图与内置函数
4.1 createOrReplaceTempView:在 Spark 中运行纯 SQL
对于许多习惯于使用 SQL 的数据分析师和工程师来说,直接编写 SQL 查询比使用链式的 DataFrame API 更自然、更直观。PySpark 完全支持这种工作方式,它允许你将一个 DataFrame 注册为一个临时视图(Temporary View),然后就可以使用 spark.sql() 方法在这个视图上执行任何标准的 SQL 查询。
临时视图的特性:
它是**会话级别(Session-scoped)**的。这意味着一个临时视图只在创建它的那个 SparkSession 中可见。当会话结束时,视图也会自动消失。
它不持有任何数据,只是指向一个底层 DataFrame 的一个别名。对视图的查询,实际上是在其底层的 DataFrame 上执行的。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SQLonDataFrame").getOrCreate()
# 准备数据
emp_data = [
(1, "Alice", "Sales", 5000), (2, "Bob", "HR", 4000),
(3, "Charlie", "Sales", 6000), (4, "David", "IT", 7500)
]
emp_columns = ["id", "name", "dept", "salary"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_columns)
# 1. 将 DataFrame 注册为一个临时视图
# 视图的名称是 "employees"
emp_df.createOrReplaceTempView("employees")
print("DataFrame 已注册为临时视图 'employees'")
# 2. 使用 spark.sql() 执行 SQL 查询
# spark.sql() 的返回值是一个新的 DataFrame
# 场景一:简单的 SELECT
high_earners_df = spark.sql("SELECT name, salary FROM employees WHERE salary > 5000")
print("
--- 查询薪水大于 5000 的员工 ---")
high_earners_df.show()
high_earners_df.printSchema() # 结果也是一个标准的 DataFrame
# 场景二:聚合查询
dept_salary_df = spark.sql("""
SELECT
dept,
COUNT(*) AS num_employees,
AVG(salary) AS avg_salary
FROM employees
GROUP BY dept
ORDER BY avg_salary DESC
""")
// 一个标准的分组聚合 SQL 查询
print("
--- 按部门统计平均薪水 ---")
dept_salary_df.show()
# 3. 视图与 DataFrame API 的互操作
# 你可以无缝地在 SQL 和 DataFrame API 之间切换
# 例如,对 SQL 查询返回的 DataFrame 继续使用 API 进行操作
filtered_depts = dept_salary_df.filter("num_employees >= 2")
print("
--- 筛选出员工数大于等于2的部门 (使用 DataFrame API) ---")
filtered_depts.show()
# **全局临时视图 (Global Temporary View)**
# 如果你需要一个跨 SparkSession 都能访问的视图,可以使用 createGlobalTempView
# 全局视图会被注册到一个名为 'global_temp' 的系统保留数据库中
# 查询时需要使用 'global_temp.view_name' 的方式引用
emp_df.createOrReplaceGlobalTempView("global_employees")
# 在一个新的 SparkSession 中也可以访问它 (这里用同一个 session 演示)
spark.sql("SELECT * FROM global_temp.global_employees WHERE dept = 'HR'").show()
spark.stop()
createOrReplaceTempView 的能力极大地增强了 PySpark 的灵活性。它允许团队中的不同角色(例如,习惯 Python 的数据科学家和习惯 SQL 的数据分析师)在同一个项目、同一个数据上协作,而无需进行任何数据转换。
4.2 pyspark.sql.functions:一个强大的函数宝库
为了避免在 DataFrame 操作中编写大量的 UDF(用户定义函数),也为了让代码更简洁、性能更高,PySpark 提供了一个极其丰富的内置函数库——pyspark.sql.functions。这个模块中包含了数百个预先定义好的、经过高度优化的函数,涵盖了字符串操作、日期时间处理、数学计算、聚合、数组和 Map 操作等方方面面。
使用内置函数的黄金法则: 能用内置函数解决的问题,就永远不要自己写 UDF。 内置函数可以直接在 JVM 中执行,避免了 Python 和 Java 之间的数据序列化/反序列化开销,性能远高于 UDF。
通常,我们会使用 import pyspark.sql.functions as F 的方式来导入这个模块。
函数宝库分类概览:
1. 字符串函数 (String Functions)
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StringFunctions").getOrCreate()
data = [
(1, " John Doe ", "john.doe@example.com"),
(2, "Jane Smith", "jane.smith@example.com"),
(3, " peter jones ", "PETER.JONES@EXAMPLE.COM")
]
df = spark.createDataFrame(data, ["id", "name", "email"])
df.show(truncate=False)
# trim, ltrim, rtrim: 去除两端、左侧、右侧的空格
# lower, upper: 转换为小写、大写
# initcap: 将每个单词的首字母转为大写
df_cleaned = df.withColumn("name_cleaned", F.initcap(F.trim(F.col("name"))))
.withColumn("email_lower", F.lower(F.col("email")))
print("
--- 清洗后的 DataFrame ---")
df_cleaned.show(truncate=False)
# substring(str, pos, len): 提取子字符串
# split(str, pattern): 按分隔符拆分字符串,返回一个数组
# regexp_replace(str, pattern, replacement): 正则替换
df_processed = df_cleaned.withColumn("username", F.split(F.col("email_lower"), "@")[0])
.withColumn("domain", F.split(F.col("email_lower"), "@")[1])
.withColumn("name_initials", F.concat(F.substring(F.col("name_cleaned"), 1, 1), F.lit(".")))
print("
--- 进一步处理后的 DataFrame ---")
df_processed.show(truncate=False)
spark.stop()
2. 日期和时间戳函数 (Date & Timestamp Functions)
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DateTimeFunctions").getOrCreate()
data = [
(1, "2023-10-27 10:30:00"),
(2, "2023-11-01 23:59:59"),
(3, "2024-01-01 00:00:00")
]
df = spark.createDataFrame(data, ["id", "event_str"])
# to_timestamp: 将字符串转换为时间戳类型
# current_timestamp: 获取当前时间戳
df_with_ts = df.withColumn("event_ts", F.to_timestamp(F.col("event_str")))
.withColumn("processing_ts", F.current_timestamp())
print("
--- 添加时间戳列 ---")
df_with_ts.printSchema()
df_with_ts.show(truncate=False)
# year, month, dayofmonth, hour, minute, second: 提取时间戳的各个部分
# date_format: 按指定格式将日期/时间戳格式化为字符串
# datediff: 计算两个日期之间的天数差
df_features = df_with_ts.withColumn("event_year", F.year(F.col("event_ts")))
.withColumn("event_month", F.month(F.col("event_ts")))
.withColumn("event_date_str", F.date_format(F.col("event_ts"), "yyyy-MM-dd"))
.withColumn("days_since_event", F.datediff(F.current_date(), F.col("event_ts")))
print("
--- 提取日期时间特征 ---")
df_features.show(truncate=False)
# date_add, date_sub: 对日期进行加减
df_future = df_features.withColumn("one_week_later", F.date_add(F.col("event_ts"), 7))
print("
--- 日期加减 ---")
df_future.show(truncate=False)
spark.stop()
3. 集合函数 (Collection Functions – Array & Map)
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CollectionFunctions").getOrCreate()
data = [
(1, ["python", "spark", "sql"]),
(2, ["java", "hadoop"]),
(3, ["python", "pandas", "numpy"])
]
df = spark.createDataFrame(data, ["user_id", "skills"])
print("--- 原始 DataFrame (含数组列) ---")
df.show(truncate=False)
# size(col): 返回数组或 Map 的大小 (长度)
# array_contains(col, value): 检查数组是否包含某个值
# explode(col): 将数组中的每个元素“爆炸”成独立的行
df_processed = df.withColumn("num_skills", F.size(F.col("skills")))
.withColumn("has_python", F.array_contains(F.col("skills"), "python"))
print("
--- 数组基本操作 ---")
df_processed.show()
print("
--- 使用 explode 将技能拆分成行 ---")
df.withColumn("skill", F.explode(F.col("skills"))).show()
// 这个操作常用于对标签、兴趣等多值属性进行分析
# 创建一个带 Map 类型的 DataFrame
map_data = [
(1, {
"score": 90, "rank": "A"}),
(2, {
"score": 75, "rank": "C", "retake": "true"})
]
map_df = spark.createDataFrame(map_data, ["id", "metrics"])
map_df.show(truncate=False)
# getitem(key) 或 [key]: 获取 Map 中指定 key 的 value
map_df.withColumn("score", F.col("metrics")["score"])
.withColumn("rank", F.col("metrics").getItem("rank"))
.show()
4. 其他常用函数
when(condition, value).otherwise(otherValue): 实现 CASE ... WHEN ... ELSE 逻辑,极其常用。
coalesce(*cols): 返回参数列表中的第一个非空值。常用于填充 null 值。
rand(): 生成一个 [0.0, 1.0) 之间的随机数列,常用于随机抽样。
isnull(col), isnotnull(col): 检查列值是否为空。
熟练地在 select, withColumn, filter, agg 中组合使用 pyspark.sql.functions 中的函数,是编写简洁、高效、可读性强的 PySpark 代码的关键。在遇到任何数据转换需求时,第一反应应该是去这个函数库中寻找是否已经有现成的解决方案。
4.3 窗口函数:超越 groupBy 的分析能力
groupBy 聚合操作功能强大,但它有一个固有的特点:它会将多行数据“压缩”成一行。例如,df.groupBy("department").avg("salary") 会返回每个部门一行,原始的员工个人信息(如姓名、年龄)都丢失了。
然而,在许多分析场景中,我们希望在保持原始行数不变的情况下,进行聚合计算。例如:
计算每个员工的薪水与他/她所在部门平均薪水的差异。
找出每个部门内薪水排名前三的员工。
计算每个用户随时间变化的累计购买总额。
这些需求都无法仅通过 groupBy 实现。为了解决这类问题,SQL 和 Spark 引入了窗口函数(Window Functions)。
窗口函数的核心思想是:对一组与当前行相关的行(这个组被称为“窗口”或“窗口帧”)进行计算,并将计算结果返回到当前行,而不改变原始的行数。
窗口函数的三大核心组件:
要使用窗口函数,我们必须先通过 pyspark.sql.Window 类来定义一个窗口规范(Window Specification)。这个规范定义了窗口的范围和行为,它由三个部分组成:
分区(Partitioning): partitionBy(*cols)
作用:定义如何将数据分组。窗口函数将在每个分区(组)内独立进行计算。这与 groupBy 的分组概念非常相似。
如果你想计算“每个部门”的平均工资,你就应该 partitionBy("department")。
排序(Ordering): orderBy(*cols)
作用:定义在一个分区内部,行的排序方式。这对于需要考虑顺序的窗口函数(如排名函数、移动平均)至关重要。
如果你想找出“薪水排名前三”的员工,你就必须在分区内按薪水 orderBy(col("salary").desc())。
窗口帧(Frame): rowsBetween(start, end) 或 rangeBetween(start, end)
作用:定义在排好序的分区内,具体哪些行被包含在当前行的计算窗口中。这是一个更高级的概念,用于计算如“移动平均”等。
start 和 end 可以是:
Window.unboundedPreceding: 从分区的起点开始。
Window.unboundedFollowing: 到分区的终点结束。
Window.currentRow: 当前行。
一个数字偏移量(如 -1, 1)。
窗口函数的分类与实战:
1. 排名函数 (Ranking Functions)
这类函数用于在分区内对行进行排名。
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("WindowRankingFunctions").getOrCreate()
# 准备数据
data = [
("James", "Sales", 3000), ("Michael", "Sales", 4600),
("Robert", "Sales", 4100), ("Maria", "Finance", 3000),
("Scott", "Finance", 3300), ("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000), ("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100) # Saif 和 Robert 薪水相同
]
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
print("--- 原始 DataFrame ---")
df.show()
# 需求:找出每个部门薪水排名前 3 的员工
# 步骤 1: 定义窗口规范
# 我们需要在每个部门(department)内部,按薪水(salary)降序排列
window_spec = Window.partitionBy("department").orderBy(F.col("salary").desc())
# 步骤 2: 使用排名函数
# row_number(): 连续排名,即使值相同,排名也不同 (1, 2, 3, 4)
# rank(): 跳跃排名,值相同则排名相同,下一名会跳跃 (1, 1, 3, 4)
# dense_rank(): 连续排名,值相同则排名相同,下一名不跳跃 (1, 1, 2, 3)
df_with_rank = df.withColumn("row_number", F.row_number().over(window_spec))
.withColumn("rank", F.rank().over(window_spec))
.withColumn("dense_rank", F.dense_rank().over(window_spec))
print("
--- 添加排名后的 DataFrame ---")
# 注意 Robert 和 Saif 的排名区别
df_with_rank.show()
# 步骤 3: 筛选出排名前 3 的员工
print("
--- 每个部门薪水排名前3的员工 (使用 rank) ---")
df_with_rank.filter(F.col("rank") <= 3).show()
2. 分析函数 (Analytic Functions)
这类函数用于在窗口内获取与其他行相关的值。
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("WindowAnalyticFunctions").getOrCreate()
df = spark.createDataFrame(data, columns) # 使用上一节的数据
# 需求:计算每个员工与他/她部门最高薪水的差距
# 步骤 1: 定义窗口规范
# 我们只需要按部门分区,这里不需要排序
window_spec = Window.partitionBy("department")
# 步骤 2: 使用 lag() 和 lead()
# lag(col, offset, default): 获取当前行往前第 offset 行的 col 值
# lead(col, offset, default): 获取当前行往后第 offset 行的 col 值
# 注意:使用 lag/lead 时,窗口规范中必须有 orderBy
window_spec_ordered = Window.partitionBy("department").orderBy("salary")
df_with_lag_lead = df.withColumn("prev_salary", F.lag("salary", 1).over(window_spec_ordered))
.withColumn("next_salary", F.lead("salary", 1).over(window_spec_ordered))
print("
--- 使用 lag 和 lead 函数 ---")
df_with_lag_lead.show()
# 步骤 3: 使用聚合函数作为窗口函数
# 几乎所有的聚合函数 (sum, avg, max, min, count) 都可以用作窗口函数
# 这时,它们会计算窗口内的聚合值,并返回给每一行
df_with_dept_max = df.withColumn("dept_max_salary", F.max("salary").over(window_spec))
print("
--- 添加部门最高薪水列 ---")
df_with_dept_max.show()
# 现在可以轻易计算薪水差距了
df_with_diff = df_with_dept_max.withColumn("diff_from_max", F.col("dept_max_salary") - F.col("salary"))
print("
--- 计算与部门最高薪水的差距 ---")
df_with_diff.show()
3. 累计聚合 (Cumulative Aggregation) 与移动平均 (Moving Average)
这是窗口函数最高级的应用,通过定义**窗口帧(Window Frame)**来实现。
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("WindowFrameFunctions").getOrCreate()
# 准备时间序列数据 (月度销售额)
sales_data = [
("2023-01-01", 100), ("2023-02-01", 120), ("2023-03-01", 150),
("2023-04-01", 140), ("2023-05-01", 160), ("2023-06-01", 180)
]
sales_df = spark.createDataFrame(sales_data, ["month", "sales"]).withColumn("month", F.to_date("month"))
print("--- 原始月度销售数据 ---")
sales_df.show()
# 需求 1: 计算每个月的累计销售额
# 窗口帧:从分区的起点 (unboundedPreceding) 到当前行 (currentRow)
cumulative_window_spec = Window.orderBy("month")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_cumulative = sales_df.withColumn("cumulative_sales", F.sum("sales").over(cumulative_window_spec))
print("
--- 计算累计销售额 ---")
df_with_cumulative.show()
# 需求 2: 计算 3 个月的移动平均销售额
# 窗口帧:从当前行往前 2 行 (-2) 到当前行 (0, 即 Window.currentRow)
moving_avg_window_spec = Window.orderBy("month")
.rowsBetween(-2, 0)
# -2 表示当前行和前面的两行,共三行
df_with_moving_avg = sales_df.withColumn("3_month_moving_avg", F.avg("sales").over(moving_avg_window_spec))
print("
--- 计算3个月移动平均销售额 ---")
df_with_moving_avg.show()
# **rowsBetween vs. rangeBetween**
# rowsBetween: 基于行的物理偏移量。rowsBetween(-1, 1) 总是包括前一行、当前行和后一行。
# rangeBetween: 基于排序列的值的范围。需要排序列是数值或日期类型。
# 例如,如果薪水是 [1000, 2000, 2000, 3000],对于薪水为 2000 的行,
# orderBy("salary").rangeBetween(-500, 500) 会将所有薪水在 [1500, 2500] 范围内的行(即两个 2000 的行)都包含进来。
# 大多数情况下,rowsBetween 更直观。
窗口函数是 PySpark 中进行高级数据分析的“瑞士军刀”。它提供了一种强大而富有表现力的方式,来解决那些需要同时考虑个体行和其所在群体(窗口)上下文的复杂问题,而这一切都在不牺牲数据集原始粒度的情况下完成。
4.4 处理缺失值与异常值
在任何数据分析项目中,数据质量都是决定成败的关键。原始数据几乎总是“不干净”的,充满了缺失值(null, None, NaN)、异常值和不一致的数据。PySpark DataFrame API 提供了一套简洁的工具来识别和处理这些问题。
处理缺失值(null / None)
DataFrame API 中有一个专门的属性 df.na,它返回一个 DataFrameNaFunctions 对象,其中包含了处理 null 值的主要方法。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("MissingData").getOrCreate()
# 准备包含缺失值的数据
data = [
("Alice", 30, 1.65), ("Bob", None, 1.75),
("Charlie", 35, None), ("David", None, None),
(None, 40, 1.80)
]
df = spark.createDataFrame(data, ["name", "age", "height"])
print("--- 含有缺失值的 DataFrame ---")
df.show()
# --- 1. 删除含有缺失值的行 (`dropna`) ---
# df.na.drop()
# 默认 how='any': 只要行中存在任何一个 null,就删除该行
print("
--- 删除任何含有 null 的行 ---")
df.na.drop(how="any").show()
# how='all': 只有当行中所有值都为 null 时才删除
print("
--- 删除所有值均为 null 的行 ---")
df.na.drop(how="all").show()
# thresh=N: 要求行中至少有 N 个非 null 值
print("
--- 要求至少有 2 个非 null 值的行 ---")
df.na.drop(thresh=2).show()
# subset=['col1', 'col2']: 只在指定的列子集中检查 null
print("
--- 只在 'age' 和 'height' 列中检查 null ---")
df.na.drop(subset=["age", "height"]).show()
# --- 2. 填充缺失值 (`fillna`) ---
# df.na.fill(value, subset=None)
# 用一个常量值填充所有数值类型的缺失列
print("
--- 用 0 填充所有数值型缺失值 ---")
df.na.fill(0).show() # "name" 列是字符串,不会被填充
# 用一个常量值填充所有字符串类型的缺失列
print("
--- 用 'Unknown' 填充所有字符串型缺失值 ---")
df.na.fill("Unknown").show() # age 和 height 不会被填充
# 填充指定列
print("
--- 填充 'age' 列为 0,'height' 列为 1.70 ---")
df.na.fill({
'age': 0, 'height': 1.70}).show()
# 使用聚合值进行填充 (更高级的策略)
# 例如,用平均年龄和平均身高来填充
avg_age = df.select(F.avg("age")).first()[0]
avg_height = df.select(F.avg("height")).first()[0]
print(f"
计算出的平均年龄: {
avg_age:.2f}, 平均身高: {
avg_height:.2f}")
print("
--- 使用平均值填充 ---")
df.na.fill({
'age': avg_age, 'height': avg_height}).show()
识别和处理异常值(Outliers)
异常值的处理没有像缺失值那样固定的 API,它更多地依赖于数据分析师的领域知识和统计方法。通常的流程是:
定义“异常”: 使用统计指标(如标准差、四分位数范围 IQR)来定义什么是异常值。
识别异常值: 根据定义,筛选出含有异常值的行。
处理异常值: 可以选择删除这些行,或者使用某种方法(如用上/下限值替换)来“修正”它们。
示例:使用 IQR 方法处理薪水异常值
IQR (Interquartile Range) 方法是一种稳健的异常值检测方法。一个值如果小于 Q1 - 1.5 * IQR 或大于 Q3 + 1.5 * IQR,就被认为是异常值。其中 Q1 是第一四分位数(25th percentile),Q3 是第三四分位数(75th percentile),IQR = Q3 – Q1。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("OutlierDetection").getOrCreate()
# 准备数据,包含一些异常薪水
data = [
("Alice", "Sales", 5000), ("Bob", "Sales", 5200), ("Charlie", "Sales", 4800),
("David", "Sales", 5100), ("Eve", "Sales", 15000), # 异常高薪
("Frank", "IT", 7000), ("Grace", "IT", 7500), ("Heidi", "IT", 7200),
("Ivan", "IT", 6800), ("Judy", "IT", 500) # 异常低薪
]
df = spark.createDataFrame(data, ["name", "dept", "salary"])
# 步骤 1: 计算每个部门的 Q1, Q3, IQR
# 使用 approxQuantile 函数计算近似分位数
quantiles = df.groupBy("dept").agg(
F.expr("percentile_approx(salary, 0.25)").alias("Q1"),
F.expr("percentile_approx(salary, 0.75)").alias("Q3")
)
quantiles_with_iqr = quantiles.withColumn("IQR", F.col("Q3") - F.col("Q1"))
print("--- 每个部门的薪水四分位数和IQR ---")
quantiles_with_iqr.show()
# 步骤 2: 将边界值连接回原始 DataFrame
df_with_bounds = df.join(quantiles_with_iqr, on="dept", how="left")
df_with_bounds = df_with_bounds.withColumn("lower_bound", F.col("Q1") - 1.5 * F.col("IQR"))
.withColumn("upper_bound", F.col("Q3") + 1.5 * F.col("IQR"))
print("
--- 带有异常值边界的 DataFrame ---")
df_with_bounds.show()
# 步骤 3: 识别异常值
df_with_outlier_flag = df_with_bounds.withColumn("is_outlier",
(F.col("salary") < F.col("lower_bound")) | (F.col("salary") > F.col("upper_bound"))
)
print("
--- 标记出异常值 ---")
df_with_outlier_flag.show()
print("
--- 只显示异常值记录 ---")
df_with_outlier_flag.filter(F.col("is_outlier")).show()
# 步骤 4: 处理异常值 (例如,使用边界值进行替换/Capping)
df_capped = df_with_outlier_flag.withColumn("salary_capped",
F.when(F.col("salary") < F.col("lower_bound"), F.col("lower_bound"))
.when(F.col("salary") > F.col("upper_bound"), F.col("upper_bound"))
.otherwise(F.col("salary"))
)
print("
--- 对异常薪水进行封顶处理后的 DataFrame ---")
df_capped.select("name", "dept", "salary", "salary_capped", "is_outlier").show()
这个流程展示了如何组合使用 groupBy, agg, join 和 withColumn 来实现一个相对复杂的异常值处理逻辑。数据清洗是任何成功分析的基石,熟练掌握这些技术对于确保分析结果的准确性和可靠性至关重要。
4.5 用户定义函数(UDF):扩展 Spark 的最后一道门
尽管 PySpark 提供了极其丰富的内置函数库,但在某些特殊场景下,我们可能需要实现一些内置函数无法覆盖的、高度定制化的业务逻辑。例如:
调用一个复杂的、已有的 Python 业务逻辑库。
执行一些特殊的加密或解密算法。
应用一个预训练的机器学习模型进行预测。
在这些情况下,PySpark 允许我们创建并使用用户定义函数(User-Defined Functions, UDFs)。UDF 允许你将一个普通的 Python 函数“包装”起来,使其能够像 PySpark 内置函数一样,直接在 DataFrame 的 select 或 withColumn 操作中对列数据进行处理。
然而,在使用 UDF 之前,必须牢记一条黄金法则:UDF 是最后的选择,而不是第一选择。 如果一个功能可以用内置函数组合实现,就绝对不要使用 UDF。因为 UDF 会带来显著的性能开销,滥用 UDF 是导致 Spark 作业变慢的最常见原因之一。
4.5.1 创建和使用标准 Python UDF
一个标准的 UDF 包含两个步骤:
定义一个普通的 Python 函数。
使用 pyspark.sql.functions.udf() 将这个 Python 函数注册为一个 Spark UDF,并明确指定其返回类型。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType
spark = SparkSession.builder.appName("StandardUDF").getOrCreate()
# 准备数据
data = [
("product-a", 101, "category_1"),
("item-b", 205, "category_2"),
("widget-c", 310, "category_1")
]
df = spark.createDataFrame(data, ["product_name", "product_id", "category_code"])
df.show()
# 步骤 1: 定义一个普通的 Python 函数
# 假设我们的业务逻辑是:根据产品名称的前缀,判断其来源
def get_product_source(product_name: str) -> str:
"""一个简单的 Python 函数,用于解析产品来源"""
if not product_name:
return "Unknown"
if product_name.startswith("product-"):
return "Internal"
elif product_name.startswith("item-"):
return "Partner"
else:
return "External"
# 步骤 2: 将 Python 函数注册为 UDF,并指定返回类型
# `udf()` 的第一个参数是 Python 函数,第二个参数是该函数返回值的 Spark SQL 数据类型
get_source_udf = F.udf(get_product_source, StringType())
// 告诉 Spark,这个 UDF 会返回一个字符串
# 步骤 3: 在 DataFrame 操作中使用 UDF
# UDF 的调用方式和内置函数完全一样
df_with_source = df.withColumn("source", get_source_udf(F.col("product_name")))
// 将 UDF 应用于 'product_name' 列,并将结果存入名为 'source' 的新列
print("
--- 使用 UDF 添加了产品来源列 ---")
df_with_source.show()
# UDF 也可以接收多个列作为输入
def create_full_id(category_code: str, product_id: int) -> str:
"""将分类代码和产品ID组合成一个完整的ID"""
return f"{
category_code.upper()}-{
product_id}"
# 注册一个新的 UDF
create_full_id_udf = F.udf(create_full_id, StringType())
# 调用接收多列的 UDF
df_with_full_id = df.withColumn("full_product_id", create_full_id_udf(F.col("category_code"), F.col("product_id")))
print("
--- 使用 UDF 添加了完整产品ID列 ---")
df_with_full_id.show()
spark.stop()
4.5.2 性能之殇:UDF 内部的“跨国旅行”
为什么标准 UDF 的性能如此之差?答案在于其底层的执行机制,我们可以将其比喻为一次昂贵的“跨国旅行”。
Spark 的核心计算引擎(Tungsten)是在 JVM(Java 虚拟机) 中运行的。所有的数据在 JVM 内部都以高效的二进制格式存储和操作。而我们的 UDF 是一个 Python 函数,它必须在 Python 解释器中运行。
当你在 DataFrame 中调用一个标准 UDF 时,对于每一行数据,都会发生以下过程:
离开 JVM: 正在被处理的行数据,必须从 JVM 的内存中取出。
数据序列化 (Serialization): Spark 将这行数据(通常通过 Pickle 库)序列化成字节流。这是一个 CPU 密集型操作。
跨进程传输: 这些字节流通过一个 Socket 从 JVM 进程发送到与 Executor 关联的 Python 工作进程。
数据反序列化 (Deserialization): Python 工作进程接收到字节流,并将其反序列化成 Python 对象(如字符串、整数)。
执行 Python 函数: 你的 UDF 代码在 Python 解释器中被执行,处理这些 Python 对象。
结果序列化: UDF 的返回值(一个 Python 对象)再次被序列化成字节流。
跨进程传输: 结果字节流通过 Socket 从 Python 进程被发送回 JVM 进程。
结果反序列化: JVM 接收到字节流,并将其反序列化成 Spark 内部的数据格式。
回到 JVM: 数据最终回到 Spark 的执行引擎中,以进行后续操作。
这个过程涉及两次数据序列化、两次反序列化和两次进程间通信。对于一个有数百万行数据的 DataFrame,这个“旅行”就要重复数百万次,其累积的开销是巨大的。相比之下,内置函数的所有计算都发生在高效的、统一的 JVM 环境内部,没有任何跨进程的开销。
性能对比实验:
让我们通过一个实验来量化这个性能差异。我们将实现一个简单的功能:将两个数字相加。一次使用内置函数,一次使用 UDF。
import time
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import LongType
spark = SparkSession.builder.appName("UDFPerformanceTest").master("local[2]").getOrCreate()
# 创建一个包含大量行的数据
num_rows = 5_000_000
df = spark.range(num_rows).withColumn("id2", F.col("id") + 1)
df.cache() # 将 df 缓存,以排除数据生成的时间影响
df.count() # 触发动作以填充缓存
# --- 方案一:使用内置函数 ---
print("--- 开始测试:使用内置函数 ---")
start_time_builtin = time.time()
result_df_builtin = df.withColumn("sum", F.col("id") + F.col("id2"))
// 直接使用 '+' 操作符,这是一个高效的内置函数
result_df_builtin.write.format("noop").mode("overwrite").save() # 使用 noop format 写入来触发计算,避免 collect 的开销
// .write.format("noop") 是一个标准的技巧,用于强制执行一个动作而不产生任何实际的 I/O 输出
end_time_builtin = time.time()
print(f"内置函数耗时: {
end_time_builtin - start_time_builtin:.4f} 秒")
# --- 方案二:使用标准 Python UDF ---
def python_add(x, y):
return x + y
add_udf = F.udf(python_add, LongType())
// 注册一个用于加法的 UDF
print("
--- 开始测试:使用标准 Python UDF ---")
start_time_udf = time.time()
result_df_udf = df.withColumn("sum", add_udf(F.col("id"), F.col("id2")))
result_df_udf.write.format("noop").mode("overwrite").save()
end_time_udf = time.time()
print(f"标准 UDF 耗时: {
end_time_udf - start_time_udf:.4f} 秒")
spark.stop()
在典型的硬件上运行此代码,你会发现标准 UDF 的耗时可能是内置函数的 10 倍、50 倍甚至更多。这个实验直观地证明了 UDF 带来的巨大性能成本。
4.5.3 救赎之路:Pandas UDF(向量化 UDF)
为了解决标准 UDF 的性能瓶颈,Spark 引入了 Pandas UDF,也称为向量化 UDF(Vectorized UDFs)。其核心思想是用批处理代替单行处理,从而大幅减少 JVM 和 Python 之间的数据交换次数。
Pandas UDF 的工作原理:
Pandas UDF 借助 Apache Arrow 这个项目实现了革命性的性能提升。Apache Arrow 是一个跨语言的、用于内存中列式数据的开发平台。它的特点是定义了一套标准的、与语言无关的内存数据格式。
当使用 Pandas UDF 时:
Spark 在 JVM 端将一个数据分区(Partition)转换成 Arrow 的数据格式。
这些 Arrow 格式的数据被直接发送到 Python 进程。由于 JVM 和 Python 都理解 Arrow 格式,这个过程可以实现**零拷贝(Zero-Copy)**或极低成本的转换,无需昂贵的序列化。
在 Python 进程中,Arrow 数据被高效地转换成一个 pandas.Series 或 pandas.DataFrame。
你的 Pandas UDF 函数接收整个 pandas.Series(而不是单个值)进行计算。pandas 库本身就是为高效的向量化计算而设计的。
计算结果(一个 pandas.Series)被转换回 Arrow 格式。
Arrow 格式的数据被送回 JVM,并被高效地转换回 Spark 的内部数据格式。
通过这种方式,一次“跨国旅行”可以处理成千上万行数据,极大地摊薄了序列化和进程间通信的开销。
Pandas UDF 的类型与应用:
Pandas UDF 主要分为几类,最常用的是 SCALAR 和 GROUPED_MAP。
1. SCALAR Pandas UDF (Series to Series)
这是最常见的 Pandas UDF 类型。它接收一个或多个 pandas.Series 作为输入,并返回一个长度相同的 pandas.Series。它非常适合用于实现高效的特征工程。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.types import DoubleType
spark = SparkSession.builder.appName("ScalarPandasUDF").getOrCreate()
df = spark.range(1, 1_000_001).selectExpr("id as x", "id*2 as y")
// 创建一个包含两列的 DataFrame
# 需求:计算一个复杂的表达式 sqrt(x^2 + y^2),虽然可以用内置函数实现,但我们用它来演示
# 标准 UDF 写法 (为了对比)
def calc_hypot_standard(x, y):
import math
return math.sqrt(x*x + y*y)
hypot_udf = F.udf(calc_hypot_standard, DoubleType())
# Pandas UDF 写法
# 使用 @F.pandas_udf 装饰器来定义
@F.pandas_udf(DoubleType()) # 必须在装饰器中指定返回类型
def calc_hypot_pandas(x: pd.Series, y: pd.Series) -> pd.Series:
"""
这个函数接收的是 pandas.Series,而不是单个值
"""
# 我们可以利用 pandas 和 numpy 的向量化计算能力
import numpy as np
return np.sqrt(x*x + y*y)
# --- 性能对比 ---
# 内置函数 (作为基准)
start_time_b = time.time()
df.withColumn("hypot_builtin", F.sqrt(F.col("x")**2 + F.col("y")**2))
.write.format("noop").mode("overwrite").save()
end_time_b = time.time()
print(f"内置函数耗时: {
end_time_b - start_time_b:.4f} 秒")
# 标准 UDF
start_time_s = time.time()
df.withColumn("hypot_standard_udf", hypot_udf(F.col("x"), F.col("y")))
.write.format("noop").mode("overwrite").save()
end_time_s = time.time()
print(f"标准 UDF 耗时: {
end_time_s - start_time_s:.4f} 秒")
# Pandas UDF
start_time_p = time.time()
df.withColumn("hypot_pandas_udf", calc_hypot_pandas(F.col("x"), F.col("y")))
.write.format("noop").mode("overwrite").save()
end_time_p = time.time()
print(f"Pandas UDF 耗时: {
end_time_p - start_time_p:.4f} 秒")
# 你会发现 Pandas UDF 的性能远超标准 UDF,甚至可能接近内置函数的性能
spark.stop()
2. GROUPED_MAP Pandas UDF (groupBy().applyInPandas())
这是功能最强大、最灵活的 UDF 类型。它与 groupBy().apply() 范式结合,允许你对每个分组(Group)应用一个接收 pandas.DataFrame 并返回 pandas.DataFrame 的函数。这为实现复杂的分组操作(如分组内标准化、为每个分组训练一个模型等)打开了大门。
示例:按部门对员工年龄进行 Z-score 标准化
Z-score 标准化的公式是 (value - mean) / stddev。我们需要对每个部门计算其内部的均值和标准差,然后应用到该部门的每个员工。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
spark = SparkSession.builder.appName("GroupedMapPandasUDF").getOrCreate()
# 准备数据
data = [
("Alice", "Sales", 34), ("Bob", "Sales", 45), ("Charlie", "Sales", 28),
("David", "IT", 25), ("Eve", "IT", 35), ("Frank", "IT", 40), ("Grace", "IT", 30)
]
df = spark.createDataFrame(data, ["name", "dept", "age"])
# 定义输出的 schema
# applyInPandas 返回的是一个 Spark DataFrame,因此必须明确其结构
result_schema = "name string, dept string, age int, age_zscore double"
# 定义 UDF 函数
# 它接收一个 pandas.DataFrame (代表一个分组)
# 必须返回一个 pandas.DataFrame,且列名和类型要与 result_schema 匹配
def zscore_normalize(pdf: pd.DataFrame) -> pd.DataFrame:
"""对一个 pandas DataFrame 中的 age 列进行 z-score 标准化"""
dept = pdf.dept.iloc[0] # 获取当前分组的部门名
mean_age = pdf.age.mean() # 计算该组的平均年龄
std_age = pdf.age.std() # 计算该组的年龄标准差
# 计算 z-score,并作为一个新列添加到 pandas DataFrame 中
# 如果标准差为0,则z-score为0,避免除以0的错误
if std_age > 0:
pdf['age_zscore'] = (pdf.age - mean_age) / std_age
else:
pdf['age_zscore'] = 0.0
return pdf # 返回处理后的 pandas DataFrame
# 使用 groupBy().applyInPandas()
zscore_df = df.groupBy("dept").applyInPandas(zscore_normalize, schema=result_schema)
// 1. Spark 按 'dept' 对数据进行分组
// 2. 对于每个分组 ('Sales', 'IT'),它会将该组的所有数据转换成一个 pandas.DataFrame
// 3. 将这个 pandas.DataFrame 传递给 zscore_normalize 函数
// 4. 函数返回一个新的 pandas.DataFrame
// 5. Spark 将返回的 pandas.DataFrame 转换回 Spark DataFrame 的分区
print("--- 按部门标准化年龄后的结果 ---")
zscore_df.show()
spark.stop()
UDF 使用总结与建议:
首选内置函数: 永远将内置函数作为第一选择。
避免不了时,首选 Pandas UDF: 如果必须使用自定义逻辑,优先考虑是否能用 Pandas UDF 实现。它的性能远优于标准 UDF。
标准 UDF 仅用于“万不得已”: 只有当逻辑极其复杂,无法向量化,且性能要求不高时,才考虑使用标准 Python UDF。
注意数据倾斜: 对于 GROUPED_MAP UDF,如果某个分组的数据量特别大(数据倾斜),那么处理这个分组的单个任务可能会成为整个作业的瓶颈。在使用前需要对数据分布有所了解。
第五部分:性能调优与运维精髓
第五章:Shuffle 的诅咒与救赎
在 Spark 的世界里,Shuffle 是所有性能问题的“万恶之源”。如果说 Spark 的高性能来自于其强大的并行计算能力,那么 Shuffle 就是对其并行能力的巨大阻碍。理解 Shuffle 的本质、识别其发生的原因并学会如何最大限度地避免和优化它,是 Spark 性能调优中最重要的课题。
5.1 什么是 Shuffle:分布式数据重组的昂贵代价
Shuffle,中文常译为“数据混洗”或“数据重组”,指的是 Spark 为了执行某些操作,需要在集群的 Executors 之间重新分发数据的过程。
想象一下,你有一副扑克牌,平均分给了四个玩家(四个 Executor)。现在,你需要每个玩家都把手上所有花色的牌分开,然后把所有红桃(Hearts)的牌都交给玩家A,所有黑桃(Spades)的牌都交给玩家B,以此类推。这个“把牌收集起来,按花色重新分发”的过程,就是一次 Shuffle。
在 Spark 中,当一个计算的输入和输出分区之间存在多对多的依赖关系时(即宽依赖),Shuffle 就不可避免。
触发 Shuffle 的典型操作:
repartition 和 coalesce: 这些是直接触发 Shuffle 的分区操作。repartition 会进行一次完整的 Shuffle 来将数据重新分区,而 coalesce 是一种优化的、避免完整 Shuffle 的缩减分区操作。
所有的 ByKey 操作: 如 reduceByKey, groupByKey, aggregateByKey, sortByKey。这些操作的本质就是要将具有相同 Key 的数据聚集到同一个分区来进行处理,这必然需要跨节点的数据移动。
所有的 join 操作: 当连接两个 DataFrame 时,Spark 需要确保具有相同连接键的行位于同一个分区,以便进行匹配。除了可以被优化为广播连接(Broadcast Join)的小表连接外,绝大多数 join 都会触发 Shuffle。
Shuffle 过程的四个阶段及其代价:
一个完整的 Shuffle 过程通常包含以下四个昂贵的阶段:
Map 阶段 (写出):
位置: 在 Shuffle 的上一个 Stage(例如,一个 map 操作之后紧跟着 reduceByKey)的每个 Executor 上。
动作: 每个 Task 计算完自己的输入分区后,并不是直接将结果发送到网络。相反,它会根据下一个 Stage 的分区逻辑(由 Partitioner 决定),将输出数据写入本地磁盘上的一个或多个文件中。每个文件对应下一个 Stage 的一个目标分区。
代价: 大量的磁盘 I/O。即使你的数据可以被缓存(cache()),Shuffle 的中间文件也通常需要写入磁盘,以保证在节点失败时能够恢复。
网络传输阶段:
位置: 集群网络。
动作: 下一个 Stage 的 Task 启动后,它会通过网络从上一个 Stage 的所有 Executor 上拉取(fetch)属于自己分区的那些数据文件。
代价: 大量的网络 I/O。如果数据量巨大,网络带宽会成为严重瓶颈。
Reduce 阶段 (合并与聚合):
位置: 在 Shuffle 的下一个 Stage 的每个 Executor 上。
动作: 每个 Task 会接收到来自不同节点的、多个属于自己分区的数据块。它需要将这些数据块合并起来,通常会进行排序(Sort)或聚合(Aggregate)操作,并将数据溢出(Spill)到磁盘以防止内存不足。
代价: 更多的磁盘 I/O 和 大量的 CPU 计算(用于反序列化、排序、聚合等)。
数据序列化/反序列化:
位置: 贯穿整个过程。
动作: 数据在写入磁盘和通过网络传输之前需要被序列化,在被读取和使用时需要被反序列化。
代价: CPU 密集型操作,特别是对于复杂的 Python 对象。
一个单一的 Shuffle 操作,就集齐了磁盘 I/O、网络 I/O 和 CPU 计算这三大性能杀手。因此,我们的核心优化目标就是:在保证业务逻辑正确的前提下,尽可能地减少 Shuffle 的次数,并减小每次 Shuffle 的数据量。
5.2 识别你的 Shuffle:Spark UI 的深度解读
在你开始任何优化之前,第一步是准确地识别出你的 Spark 作业中哪些部分发生了 Shuffle,以及它们的开销有多大。Spark Web UI 是进行这项诊断最强大的工具。
默认情况下,Spark Driver 会在 4040 端口启动一个 Web UI。你可以通过浏览器访问 http://<driver-node-ip>:4040 来查看。
在 Spark UI 中寻找 Shuffle 的踪迹:
Jobs 页面:
一个 Spark 作业(Job)对应代码中的一个 Action 操作。点击进入一个 Job 的详情页。
你会看到一个 DAG Visualization(有向无环图可视化)。图中的 Stage 会被蓝色的方框表示。Stage 之间的连线,就代表了一次 Shuffle。 如果你看到你的 DAG 被分成了多个 Stage,那么你的作业中就至少发生了一次 Shuffle。
Stages 页面:
这个页面列出了所有的 Stage。
关键指标:
Shuffle Read: 在 Reduce 阶段,该 Stage 从网络上读取的总数据量。
Shuffle Write: 在 Map 阶段,该 Stage 为下一个 Stage 写入本地磁盘的总数据量。
诊断: 如果你发现某个 Stage 的 Shuffle Read 或 Shuffle Write 的数据量非常巨大(例如,达到 GB 或 TB 级别),那么这个 Shuffle 就是一个主要的性能瓶颈。
点击进入某个 Stage 的详情页,你可以看到该 Stage 中所有 Task 的详细信息。如果发现某些 Task 的 Shuffle Read/Write 时间或数据量远超其他 Task,这通常是数据倾斜的信号。
代码示例:触发 Shuffle 并观察 UI
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time
spark = SparkSession.builder.appName("ShuffleObserver").master("local[4]").getOrCreate()
# 准备数据
num_records = 10_000_000
df = spark.range(num_records).withColumn("group_key", (F.col("id") % 1000).cast("string"))
// 创建一个有一千万行的数据,并生成 1000 个不同的分组键
df.rdd.getNumPartitions() # 查看初始分区数
# 触发一次代价高昂的 Shuffle
# groupByKey 是一个典型的宽依赖操作
grouped_df = df.groupBy("group_key").count()
// 这个 groupBy 会触发一次 Shuffle
# 执行一个 Action 来提交作业
grouped_df.collect()
// collect() 会触发计算
print("作业已执行,请访问 Spark UI (通常在 http://localhost:4040) 查看。")
print("在 'Jobs' 页面找到最新的作业,点击进入详情,观察 DAG 图。")
print("在 'Stages' 页面,观察 'Shuffle Write' 和 'Shuffle Read' 的数据量。")
# 为了让程序保持运行,以便你有时间查看 UI
time.sleep(300) # 暂停 5 分钟
spark.stop()
运行这段代码后,打开 Spark UI,你会清晰地看到:
DAG 图被分成了两个 Stage。Stage 0 负责 range 和 withColumn,它的输出被作为 Shuffle Write。Stage 1 负责 groupBy 和 count,它从 Stage 0 进行 Shuffle Read。
Stages 页面会显示 Stage 0 写入了多少 Shuffle 数据,以及 Stage 1 读取了多少 Shuffle 数据。
通过这种方式,你可以将代码中的操作(如 groupBy)与 UI 中观察到的性能指标(Shuffle 数据量)精确地对应起来,从而定位到需要优化的代码段。
5.3 优化策略(一):避免 Shuffle
最优的策略是从根本上避免 Shuffle 的发生。虽然并非所有 Shuffle 都能避免,但很多不必要的 Shuffle 是由于糟糕的设计或对 Spark 机制的误解造成的。
1. 使用广播连接(Broadcast Join)
这是避免 join 操作发生 Shuffle 最有效的技术,我们在前面已经介绍过。当一个大表需要和一个小表(维度表)连接时,将小表广播到每个持有大表分区的 Executor 上,可以完全避免对大表的 Shuffle。
复习与深度思考:
阈值配置: spark.sql.autoBroadcastJoinThreshold (默认 10MB)。Spark 会自动广播小于这个阈值的表。但依赖自动广播并不总是可靠的,因为 Spark 在执行计划阶段可能无法准确估算表的大小。
主动广播: 使用 pyspark.sql.functions.broadcast() 函数包裹小表,是向优化器发出的一个强制指令。这是一种更明确、更可靠的策略。
风险: 广播的代价是将小表的所有数据先收集到 Driver,再由 Driver 分发给所有 Executor。如果小表“不够小”(比如几百MB),这可能会撑爆 Driver 的内存,或者在 Executor 端造成 OOM。
2. 使用窗口函数代替 groupBy + join
这是一个非常常见但容易被忽略的优化场景。假设你需要计算每个员工的薪水与他/她所在部门平均薪水的差异。
低效的方法:groupBy + join
# 假设 df 是包含 name, department, salary 的员工 DataFrame
# 1. 计算每个部门的平均薪水 (触发一次 Shuffle)
dept_avg_salary = df.groupBy("department").agg(F.avg("salary").alias("avg_salary"))
# 2. 将结果 join 回原始的 df (触发第二次 Shuffle)
df_with_diff_inefficient = df.join(dept_avg_salary, on="department", how="inner")
.withColumn("diff", F.col("salary") - F.col("avg_salary"))
df_with_diff_inefficient.show()
# 这个过程涉及两次 Shuffle,非常低效。
高效的方法:使用窗口函数
from pyspark.sql import Window
# 定义一个窗口,按部门分区
window_spec = Window.partitionBy("department")
# 使用 avg() 作为窗口函数
df_with_diff_efficient = df.withColumn("avg_salary", F.avg("salary").over(window_spec))
.withColumn("diff", F.col("salary") - F.col("avg_salary"))
df_with_diff_efficient.show()
# 整个过程只涉及一次数据扫描,没有 Shuffle!
# F.avg("salary").over(window_spec) 会在每个分区内计算,并将结果附加到每一行。
这个例子完美地展示了思维模式的转变。通过使用窗口函数,我们将一个需要两次 Shuffle 的复杂操作,转换成了一个无 Shuffle 的高效操作。
3. reduceByKey 优于 groupByKey
这个原则在 RDD API 中已经强调过,在 DataFrame API 中同样适用。当你使用 groupBy("key").agg(...) 时,Spark 的 Catalyst 优化器通常足够智能,能够实现与 reduceByKey 类似 Map-side 预聚合的效果。然而,如果你先 groupBy("key"),然后对 GroupedData 对象使用需要处理所有值的 UDF(例如通过 applyInPandas),那么 Map-side 预聚合就无法进行,其效果就类似于 groupByKey。
关键思想:始终优先选择那些可以被增量计算的聚合函数(sum, count, max, min, avg),因为它们都可以在 Map 端进行高效的预聚合,从而大幅减少 Shuffle 的数据量。
5.4 优化策略(二):减小 Shuffle 数据量
如果 Shuffle 无法避免,我们的下一个目标就是让每次 Shuffle 传输的数据尽可能少。
1. 在 Shuffle 前进行过滤(Filter before Shuffle)
这是最重要的原则之一。数据越早被过滤掉,参与后续昂贵操作(如 Shuffle 和 Join)的数据量就越少。Catalyst 优化器中的**谓词下推(Predicate Pushdown)**会自动地尝试将 filter 或 where 条件尽可能地移动到数据源端执行。
示例:
# 假设 parquet 文件是按日期分区的
spark.read.parquet("path/to/sales_data")
.filter("sale_date > '2023-01-01'") # 过滤条件
.join(customers_df, on="customer_id")
.groupBy("category")
.count()
.show()
在这个查询中,Catalyst 会将 filter("sale_date > '2023-01-01'") 这个条件“下推”到 Parquet 读取器。如果 Parquet 数据是按 sale_date 分区的,Spark 甚至可以只读取符合条件的分区目录,连文件都不用打开,极大地减少了初始加载的数据量,从而减少了后续 join 和 groupBy 需要 Shuffle 的数据量。
你的责任:虽然 Catalyst 很智能,但你也应该在逻辑上养成先 filter,后 join,再 groupBy 的编码习惯。
2. 只选择需要的列(Column Pruning)
在 Shuffle 之前,丢掉所有你不需要的列。列越少,每行数据占用的空间就越小,Shuffle 时写入磁盘和通过网络传输的数据总量也相应减小。
低效的做法:
transactions_df.join(users_df, on="user_id")
.groupBy("state")
.agg(F.sum("amount"))
.show()
# 如果 transactions_df 和 users_df 有很多列,这些列都会参与 join 和 shuffle,即使最后只用了 state 和 amount
高效的做法:
# 在 join 之前,先用 select 选出需要的列
users_subset = users_df.select("user_id", "state")
transactions_subset = transactions_df.select("user_id", "amount")
transactions_subset.join(users_subset, on="user_id")
.groupBy("state")
.agg(F.sum("amount"))
.show()
# Catalyst 的列裁剪(Column Pruning)优化通常也能做到这一点,但显式地 select 是一种更清晰、更可靠的编码风格。
5.5 优化策略(三):调整 Shuffle 参数
在某些情况下,通过调整 Spark 的配置参数,可以进一步优化 Shuffle 的性能。这些是更高级的调优手段,需要对 Spark 的内部机制有更深的理解。
1. spark.sql.shuffle.partitions
作用: 这个参数定义了在 Shuffle 之后,数据被重新分区的数量。它也是 groupBy, join 等操作产生的 DataFrame 的默认分区数。
默认值: 200。
调优指南:
如果值太小: 比如只有 10 个分区,但你有 100 个 CPU 核心,那么只有 10 个核心能参与计算,造成严重的资源浪费。每个分区的数据量可能过大,导致单个 Task 执行时间过长,甚至 OOM。
如果值太大: 比如有 5000 个分区,但数据总量很小。这会产生大量非常小的 Task。Spark 调度这些小 Task 的开销(每个 Task 启动都需要时间)可能会超过实际的计算时间。同时,Map 阶段会生成大量的 Shuffle 文件,给文件系统带来压力。
经验法则: 一个比较合理的设置是,让每个 Shuffle 后的分区数据大小在 100MB 到 200MB 之间。你可以通过观察 Stage 页面中的 Shuffle Read/Write 数据总量,然后除以你期望的分区大小,来估算一个合适的分区数。通常,这个值会设置为集群总 CPU 核心数的 2 到 3 倍。
如何设置:
# 在 SparkSession 中设置
spark.conf.set("spark.sql.shuffle.partitions", "500")
# 或者在代码中对特定 DataFrame 使用 repartition
df.repartition(500, "group_key").groupBy("group_key").count().show()
# 使用 repartition(N, key) 会根据 key 进行哈希分区,比单纯的 repartition(N) 更高效
2. spark.files.maxPartitionBytes
作用: 当Spark从文件中读取数据时,此参数指定了每个分区的最大字节数。
默认值: 128MB (与HDFS默认块大小一致)。
调优指南:
如果输入文件非常大但不可分割(例如,未压缩的CSV,gzip压缩的文件),Spark可能会为整个文件只创建一个分区,导致并行度极低。在这种情况下,你可以适当减小spark.files.maxPartitionBytes,强制Spark将大文件切分成更多的分区。
如果有很多小文件,可以适当增大此参数,让Spark将多个小文件合并到一个分区中处理,减少Task调度开销。
5.6 终极武器:数据倾斜(Data Skew)的定位与解决方案
在解决了基础的 Shuffle 问题之后,我们常常会遇到一个更隐蔽、也更棘手的敌人——数据倾斜(Data Skew)。数据倾斜是 Shuffle 性能问题中最极端、最致命的一种。
什么是数据倾斜?
数据倾斜指的是,在 Shuffle 过程中,由于数据分布不均,导致绝大部分数据被分配到了极少数几个分区(甚至一个分区)上,而其他大多数分区只处理了极少量的数据。
数据倾斜的后果:
“短板效应”: 一个 Spark Stage 的完成时间,取决于该 Stage 中最慢的那个 Task 的完成时间。当发生数据倾斜时,处理海量数据的少数 Task 会运行得极其缓慢,而其他处理少量数据的 Task 早已完成,处于空闲等待状态。这导致整个 Stage 被“拖慢”,集群的并行计算能力被严重浪费。
内存溢出(OOM): 那个接收了海量数据的“明星”Task,其所在的 Executor 内存很可能会被撑爆,导致任务失败(OOM),甚至整个 Executor 崩溃。
数据倾斜的根源:
数据倾斜的根本原因在于数据本身。通常发生在 groupBy、join 等操作中,某些**“热点 Key”**(Hot Key)的出现频率远高于其他 Key。例如:
在网站日志分析中,user_id 为 null 或 guest 的访客记录可能占了总数的很大一部分。
在电商交易数据中,某些爆款商品的 product_id 会被大量关联。
在社交网络分析中,某个明星的 user_id 会有海量的粉丝关系。
定位数据倾斜:
同样,Spark UI 是我们定位数据倾斜的利器。
Stages 页面:
进入一个疑似缓慢的 Stage 详情页。
排序任务列表: 按 Duration(耗时) 对 Task 列表进行降序排序。如果你看到最顶部的几个 Task 耗时远超中位数(例如,几分钟 vs. 几秒钟),这几乎可以断定发生了数据倾斜。
查看指标: 进一步查看这些慢任务的 Shuffle Read Size 或 Records Read。你会发现它们的值也远大于其他任务。
Executors 页面:
如果你观察到某个 Executor 的 Shuffle Read 或 Shuffle Write 指标异常高,或者其 Task Time (GC Time) 特别长,这说明这个 Executor 可能正在处理一个倾斜的 Task。
解决方案一:过滤掉导致倾斜的 Key
这是最简单直接的方法。很多时候,导致倾斜的 Key 本身就是无意义的“脏数据”,比如 null、-1、"" 等。
适用场景: 倾斜的 Key 对业务分析没有价值。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("SkewSolutionFilter").getOrCreate()
# 模拟一个含有大量 null key 的数据集
skewed_data = [("A", 1), ("B", 2), ("C", 3)] * 1000
+ [(None, 1)] * 1000000
df = spark.createDataFrame(skewed_data, ["key", "value"])
# 未经过滤的 groupBy,会非常慢,且可能失败
# df.groupBy("key").count().show()
# 解决方案:在 groupBy 之前,先过滤掉 null key
filtered_df = df.filter(F.col("key").isNotNull())
# 现在,在过滤后的、分布均匀的数据上进行 groupBy
# 这个操作会非常快
result = filtered_df.groupBy("key").count()
result.show()
spark.stop()
解决方案二:两阶段聚合(Two-Phase Aggregation)
这是一种通用且非常有效的处理倾斜 Key 的方法。其核心思想是:将倾斜的 Key 单独处理,然后将结果与正常 Key 的处理结果合并。
工作流程:
第一次聚合(加盐):
对原始 DataFrame,为每个 Key 拼接一个随机的后缀(称为“盐”,Salt),将其打散成多个新的 Key。例如,将倾斜的 Key hot_key 打散成 hot_key_1, hot_key_2, …, hot_key_N。
这样,原来集中在一个 Task 上的 hot_key 数据,就被分散到了 N 个不同的 Task 中去进行第一轮局部聚合。
第二次聚合(去盐):
对第一轮聚合的结果,去掉之前添加的随机盐后缀,恢复成原始的 Key。
再次进行一次聚合,将各个加盐子 Key 的局部聚合结果合并,得到最终的全局结果。
适用场景: 倾斜的 Key 是有业务价值的,不能被过滤掉。适用于 groupBy 聚合场景。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("SkewSolutionTwoPhaseAgg").config("spark.sql.shuffle.partitions", 4).getOrCreate()
# 模拟数据,'A' 是一个倾斜的 Key
skewed_data = [("A", 1)] * 100000 + [("B", 1)] * 100 + [("C", 1)] * 100
df = spark.createDataFrame(skewed_data, ["key", "value"])
# --- 直接聚合 (会导致倾斜) ---
# df.groupBy("key").sum("value").show()
# --- 两阶段聚合解决方案 ---
# 1. 识别出倾斜的 Key (在实际场景中,这可能需要一次预分析)
# 这里我们假设已经知道 'A' 是倾斜的
skewed_key = "A"
# 2. 将数据拆分成倾斜部分和非倾斜部分
skewed_df = df.filter(F.col("key") == skewed_key)
normal_df = df.filter(F.col("key") != skewed_key)
# 3. 处理倾斜部分 (两阶段聚合)
# 定义加盐的 UDF 或使用内置函数
salt_factor = 4 # 将倾斜的 Key 打散成 4 份
salted_skewed_df = skewed_df.withColumn("salted_key",
F.concat(F.col("key"), F.lit("_"), (F.rand() * salt_factor).cast("int"))
)
// 为倾斜key 'A' 添加随机后缀,如 'A_0', 'A_1', 'A_2', 'A_3'
salted_skewed_df.show(5, truncate=False)
# 阶段一聚合:在加盐的 Key 上进行局部聚合
partial_agg = salted_skewed_df.groupBy("salted_key").sum("value")
partial_agg.show()
# 阶段二聚合:去掉盐,进行最终聚合
desalted_agg = partial_agg.withColumn("original_key", F.split(F.col("salted_key"), "_")[0])
.groupBy("original_key")
.sum("sum(value)")
.withColumnRenamed("sum(sum(value))", "total_value")
.withColumnRenamed("original_key", "key")
print("
--- 倾斜 Key 的最终聚合结果 ---")
desalted_agg.show()
# 4. 处理非倾斜部分 (正常聚合)
normal_agg = normal_df.groupBy("key").sum("value").withColumnRenamed("sum(value)", "total_value")
print("
--- 非倾斜 Key 的聚合结果 ---")
normal_agg.show()
# 5. 合并两部分结果
final_result = desalted_agg.unionByName(normal_agg)
print("
--- 最终合并结果 ---")
final_result.show()
spark.stop()
解决方案三:倾斜 join 的特殊处理
对于 join 操作发生的数据倾斜,情况更为复杂,但思路是类似的。
场景:一个大表 fact_df 要 join 一个小一点的表 dim_df,但 fact_df 的连接键严重倾斜。
解决方案:将倾斜的维表部分单独处理
拆分维表 (dim_df): 将 dim_df 拆分成两部分:
skewed_dim_df: 只包含导致倾斜的那些 Key 的行。
normal_dim_df: 包含其余的正常 Key 的行。
复制和加盐事实表 (fact_df):
将 fact_df 中对应倾斜 Key 的行提取出来,形成 skewed_fact_df。
对 skewed_fact_df 的连接键进行随机加盐,与两阶段聚合类似。
广播和小表 join:
将非常小的 skewed_dim_df 广播出去。
将加盐后的 skewed_fact_df 与广播后的 skewed_dim_df 进行 join。由于事实表的 Key 已经被打散,这个 join 不会倾斜。
join 之后,去掉盐。
正常 join:
将 fact_df 中对应正常 Key 的部分与 normal_dim_df 进行一次常规的 join。
合并结果:
union 上面两个 join 的结果。
这个过程非常复杂,但在关键的 join 场景下是解决问题的“核武器”。幸运的是,一些现代的数据湖平台(如 Databricks)在它们的 Spark 版本(Photon 引擎)中内置了自适应查询执行(Adaptive Query Execution, AQE),其中就包含了自动处理数据倾斜的功能。AQE 可以在运行时动态地检测到倾斜,并自动地将倾斜的分区拆分成更小的子分区进行处理,极大地简化了开发者的调优工作。
5.7 数据格式与压缩:从源头提升性能
性能调优不仅仅是关于算法和参数,它始于你如何存储数据。选择正确的文件格式和压缩编解码器,可以在不修改任何一行 Spark 代码的情况下,将作业性能提升数倍。
行式存储 vs. 列式存储
行式存储 (Row-Oriented):
格式: CSV, JSON, Avro。
原理: 将一行数据的所有列连续地存储在一起。例如,(id1, name1, age1), (id2, name2, age2), ...
优点: 适合于需要获取整行数据的场景(OLTP 数据库常用)。写入新数据很快。
缺点: 对于分析查询(OLAP)是灾难性的。如果你只想查询 age 列的平均值,Spark 不得不读取每一行的所有数据(id, name, age),然后丢弃掉 id 和 name。这造成了巨大的 I/O 浪费。
列式存储 (Column-Oriented):
格式: Parquet, ORC。
原理: 将每一列的数据连续地存储在一起。例如,(id1, id2, ...), (name1, name2, ...), (age1, age2, ...)。
优点:
I/O 极高: 分析查询的绝佳选择。当查询 AVG(age) 时,Spark 只需要读取 age 这一列的数据,I/O 量可以减少几个数量级。这正是前面提到的谓词下推和列裁剪能够高效工作的基础。
高压缩率: 同一列的数据类型相同,具有相似的模式,因此非常容易被高效压缩。例如,一列全是整数的 age 比一行混杂着整数、字符串的记录更容易压缩。
缺点: 写入和更新操作比行式存储稍慢。
黄金法则:对于任何要在 Spark 中进行分析的数据,都应优先使用列式存储格式,首选是 Parquet。
Parquet:为分析而生的文件格式
Parquet 是 Spark 生态系统中的一等公民,也是事实上的标准。它除了列式存储的优点外,还包含了丰富的元数据:
每个文件内部都存储了模式信息(Schema)。
每个列块(Column Chunk)都存储了统计信息(如 min, max, count, null_count)。这使得 Spark 在执行 df.filter("age > 30") 这样的查询时,可以先读取元数据,如果某个列块的 max(age) 只有 25,那么 Spark 就可以直接跳过读取整个列块的数据,进一步减少 I/O。
压缩编解码器(Compression Codecs)
压缩是在 CPU 开销 和 I/O (磁盘/网络) 开销之间做出的权衡。通过消耗一些 CPU 来压缩数据,可以减少需要存储的磁盘空间和需要通过网络传输的数据量。
| Codec | 压缩比 | 压缩/解压速度 | 是否可分割 (Splittable) | 适用场景 |
|---|---|---|---|---|
| Snappy | 中等 | 非常快 | 是 | Spark 的默认和推荐选择。在速度和压缩比之间取得了绝佳的平衡。 |
| Gzip | 高 | 慢 | 否 | 当磁盘空间极其宝贵,且不关心 CPU 开销时。不可分割意味着 Spark 无法并行处理一个大的 Gzip 文件。 |
| LZO | 中等 | 快 | 是 (需索引) | 曾经流行,但现在 Snappy 更为通用。 |
| Zstandard (ZSTD) | 高 | 快 | 是 | 新一代的压缩算法,提供了与 Gzip 相当的压缩比和接近 Snappy 的速度,是未来的趋势。Spark 3.2.0+ 已支持。 |
实践指南:
转换数据格式: 如果你的原始数据是 CSV 或 JSON,强烈建议在数据处理流程的第一步,就将其转换为 Parquet 格式,并使用 Snappy 压缩。这个一次性的转换开销,将会在后续所有的分析查询中得到百倍的回报。
设置压缩方式:
# 在写入数据时指定
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
df.write.format("parquet").mode("overwrite").save("path/to/output_snappy.parquet")
# 如果要使用 ZSTD
# spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
# df.write.parquet("path/to/output_zstd.parquet")
通过战略性地选择 Parquet + Snappy/ZSTD 的组合,你就为高性能的 Spark 作业打下了最坚实的基础,许多性能问题甚至在它们出现之前就已经被消除了。
5.8 小文件问题(Small File Problem):分布式文件系统的阿喀琉斯之踵
在处理大规模数据时,我们不仅要警惕 Shuffle 和数据倾斜,还要面对一个同样棘手但更常被忽视的问题——小文件问题。这个问题在数据湖、Hadoop HDFS 以及任何依赖分布式文件系统的计算引擎(如 Spark)中都普遍存在。
什么是小文件问题?
小文件问题指的是一个存储系统(如 HDFS 或 S3)中存在大量尺寸远小于其最佳块大小(Block Size)的文件。HDFS 的默认块大小通常是 128MB 或 256MB。如果一个目录下有成千上万个只有几 KB 或几 MB 的文件,就产生了小文件问题。
小文件问题的成因:
流式数据摄入: 许多实时数据管道(如 Kafka Connect, Flume)会以小批量、高频率的方式将数据写入数据湖,每一批次都可能生成一个小文件。
Spark 作业的输出: 一个 Spark Stage 的输出文件数量,等于该 Stage 的任务(Task)数量。如果你有一个分区数为 500 的 DataFrame,那么 df.write.parquet(...) 就会生成 500 个 Parquet 文件。如果每个分区处理的数据量很小,就会产生 500 个小文件。
动态分区写入: 当使用 df.write.partitionBy("colA", "colB")... 写入数据时,如果分区键(colA, colB)的组合基数非常高,就可能为每个组合都生成一个或多个小文件,导致文件数量爆炸性增长。
小文件为何是“问题”?
对 NameNode 的巨大压力 (HDFS):
在 HDFS 中,文件的所有元数据(包括文件名、路径、权限、块位置等)都存储在 NameNode 的内存中。每个文件,无论大小,都需要占用 NameNode 的一部分内存(大约 150 字节)。
如果有数亿个小文件,NameNode 的内存可能会被耗尽,成为整个集群的单点瓶颈,甚至导致集群崩溃。NameNode 的内存是 HDFS 最宝贵的资源。
Spark 任务启动开销:
Spark 在规划一个作业时,为每个文件(或文件的块)启动一个 Task 是一个常见的调度策略。
启动一个 Task 本身是有开销的:与 Driver 通信、序列化任务、分发到 Executor 等。
如果处理 1GB 的数据,是启动 8 个 Task(每个处理 128MB)高效,还是启动 1024 个 Task(每个处理 1MB)高效?答案显然是前者。处理大量小文件会导致 Spark 花在任务调度上的时间远超实际计算的时间。
I/O 性能下降:
从文件系统读取数据,需要先打开文件、寻址、然后读取。对于大量小文件,这种重复的“打开/寻址”操作的累积开销非常大。
相比于一次性顺序读取一个大文件,多次随机读取大量小文件会大大降低磁盘吞吐量。
定位小文件问题:
文件系统命令: 使用 hdfs dfs -ls -R /path/to/dir 或 aws s3 ls --recursive s3://bucket/path/ 来查看目录下的文件列表和大小。
Spark UI: 如果你发现一个 Stage 中有数量异常多的 Task,但每个 Task 的输入数据量(Input Size)又非常小(例如,只有几 KB),这通常是小文件问题的征兆。
解决方案一:在读取时合并(Read-Time Coalescing)
当数据源已经是小文件时,我们可以在读取它们时就进行合并。
1. spark.files.maxPartitionBytes
我们在前面提到过这个参数。通过增大这个值,你可以告诉 Spark 在创建分区时,将多个小文件打包进同一个分区进行处理。
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("ReadTimeCoalesce")
.config("spark.files.maxPartitionBytes", "512mb") # 将每个分区的最大大小增加到 512MB
.config("spark.sql.files.openCostInBytes", "16mb") # 增大打开文件的开销估算,鼓励合并
.getOrCreate()
# 假设 'path/to/small_files' 目录下有大量小文件
df = spark.read.parquet("path/to/small_files")
# 打印分区数,你会发现它比不设置配置时要少
print(f"合并后读取的 DataFrame 分区数: {
df.rdd.getNumPartitions()}")
spark.stop()
2. 二进制文件读取器 (binaryFile)
这是一个更底层的接口,它会将每个文件读取为一个 (path, content) 的记录。这给了你完全的控制权,但使用起来更复杂,通常不用于常规的表格数据处理。
解决方案二:在写入时合并(Write-Time Compaction)
这是一种更主动、更根本的解决策略。与其等读取时再处理,不如在数据写入时就避免产生小文件。
1. repartition() 和 coalesce()
在执行 write 操作之前,先使用 repartition() 或 coalesce() 来控制输出文件的数量。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WriteTimeCompaction").getOrCreate()
# 假设 df 是一个有 1000 个分区的大 DataFrame
df = spark.range(100_000_000).repartition(1000)
print(f"原始 DataFrame 分区数: {
df.rdd.getNumPartitions()}")
# --- 不好的写法:会产生 1000 个小文件 ---
# df.write.mode("overwrite").parquet("path/to/output_1000_files")
# --- 好的写法:使用 repartition 或 coalesce ---
# 假设我们希望输出大约 10 个文件
# repartition(N) 会进行一次完整的 Shuffle,确保输出正好是 N 个文件,且数据分布相对均匀
# 这适用于需要增加或大幅减少分区数的场景
df_repartitioned = df.repartition(10)
print(f"Repartition 后的分区数: {
df_repartitioned.rdd.getNumPartitions()}")
df_repartitioned.write.mode("overwrite").parquet("path/to/output_10_files")
# coalesce(N) 是一种优化的 repartition,它会尽量避免完整的 Shuffle
# 它通过合并现有的相邻分区来实现,因此只能用于减少分区数,且不能解决数据倾斜
# 如果你只是想减少文件数量,且分区数减少幅度不大,coalesce 更高效
df_coalesced = df.coalesce(100)
print(f"Coalesce 后的分区数: {
df_coalesced.rdd.getNumPartitions()}")
df_coalesced.write.mode("overwrite").parquet("path/to/output_100_files")
repartition vs. coalesce 的选择:
repartition(N): 需要进行一次全量的 Shuffle,开销较大。但它可以增加或减少分区数,并且可以重新打散数据,有助于缓解数据倾斜。
coalesce(N): 只会合并父 RDD 中的分区,不进行 Shuffle(或者说只进行本地的 Shuffle)。开销小,但只能用于减少分区数,并且如果原始数据是倾斜的,coalesce 后的数据仍然是倾斜的。
2. partitionBy() 时的考量
当你使用 df.write.partitionBy("colA", "colB")... 时,如果 (colA, colB) 的组合非常多,你仍然会面临小文件问题。解决方案是在 partitionBy 之前,先对数据进行一次 repartition。
# 假设 df 有 date, country, category, ... 等列
# 如果直接 partitionBy("country", "category"),可能会产生非常多的目录和文件
# 优化策略:在写入前,先按分区键进行 repartition
# 这会确保具有相同 (country, category) 的数据被 Shuffle到同一个分区,
# 从而在写入时,每个分区目录中的文件数量会大大减少。
df.repartition("country", "category")
.write
.partitionBy("country", "category")
.mode("overwrite")
.parquet("path/to/partitioned_output")
解决方案三:离线合并(Offline Compaction)
对于已经存在的大量小文件,或者对于无法在写入时进行干预的流式摄入场景,最佳实践是运行一个定期的、离线的合并作业(Compaction Job)。
这个作业的逻辑非常简单:
定时触发: 例如,每小时或每天运行一次。
读取小文件: Spark 作业读取一个特定时间窗口内(例如,过去一小时)的所有小文件。
合并: 使用 repartition 或 coalesce 将数据合并成少量的大文件。
替换: 将新生成的大文件“事务性地”替换掉原始的小文件。
这个过程是现代数据湖架构(如 Delta Lake, Apache Iceberg, Apache Hudi)的核心功能之一。这些数据湖格式提供了ACID 事务能力,它们的 OPTIMIZE 或 COMPACT 命令可以自动、安全地执行小文件合并,而不会影响正在读取这些数据的其他作业。
使用 Delta Lake 进行小文件合并的示例:
from pyspark.sql import SparkSession
from delta.tables import * # 导入 DeltaTable 类
spark = SparkSession.builder
.appName("DeltaCompaction")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.DeltaCatalog")
.getOrCreate()
# 假设我们有一个 Delta 表,它因为流式写入而产生了大量小文件
# df.write.format("delta").mode("append").save("/path/to/my_delta_table") # 假设这个操作被频繁调用
# --- 手动触发合并 ---
# 将路径转换为 DeltaTable 对象
delta_table = DeltaTable.forPath(spark, "/path/to/my_delta_table")
# 执行 OPTIMIZE 命令
# 这会读取小文件,并将它们合并成更大的文件(默认 1GB)
print("开始执行 Delta Lake 的 OPTIMIZE 命令...")
delta_table.optimize()
print("小文件合并完成。")
# 也可以使用 SQL
spark.sql("OPTIMIZE /path/to/my_delta_table")
# 查看表的历史记录,可以看到 OPTIMIZE 操作
delta_table.history().show()
spark.stop()
使用像 Delta Lake 这样的数据湖格式,是根治小文件问题的最现代化、最可靠的方案。它将复杂的离线合并逻辑封装成了简单的命令,并保证了操作的原子性和数据的一致性。
第六章:内存管理与执行器调优
在解决了数据层面的 Shuffle 和文件存储问题后,性能优化的下一站是资源层面。如何为你的 Spark 作业分配合理的计算资源?如何配置 Driver 和 Executor 的内存,以避免 OOM (OutOfMemoryError) 并最大化利用集群资源?这是将 Spark 作业从“能跑”提升到“跑得好”的关键一步。
6.1 Spark 内存模型深度剖析 (YARN)
要进行内存调优,首先必须理解 Spark Executor 的内存是如何被划分和使用的。Spark 的内存模型经过了多次演进,在 Spark 1.6+ 版本中,引入了**统一内存管理(Unified Memory Management)**模型,这是我们今天分析的基础。
一个 Executor 的总内存,由 spark.executor.memory 参数指定。但这块内存并不是一个整体,它被清晰地划分成了几个功能区:
![图片[1] - 【Python】PySpark数据分析 - 宋马](https://pic.songma.com/blogimg/20250610/236b72e0488546caa58450b31945eb9c.png&pos_id=img-PGSnaHp6-1749213942994)
(这是一个经典的 Spark 1.6+ 内存模型图)
JVM 堆内存 (JVM Heap): 这是 spark.executor.memory 直接控制的部分。它被进一步划分为:
Spark 内存 (Spark Memory): 这是 Executor 内存的核心区域,由 spark.memory.fraction 参数控制,默认为 0.6 (即 60%)。这部分内存被**执行内存(Execution Memory)和存储内存(Storage Memory)**动态共享。
执行内存 (Execution Memory): 用于满足任务执行期间的内存需求,主要是为 Shuffle、Join、Sort 和 Aggregation 等操作中的中间数据(如哈希表、缓冲区)提供存储空间。
存储内存 (Storage Memory): 用于存储用户通过 cache() 或 persist() 持久化的 RDD 和 DataFrame 分区数据。
动态占用机制: 这是统一内存管理的核心。执行内存和存储内存之间可以互相借用。当执行任务需要大量内存,而存储区空闲时,执行区可以临时“借用”存储区的空间。反之亦然。但是,执行内存有优先抢占权。如果一个任务需要内存,而存储区有缓存数据,Spark 会将缓存的数据块“驱逐”(Evict)到磁盘(如果存储级别允许),为任务执行腾出空间。
用户内存 (User Memory): 这部分内存由 1 - spark.memory.fraction 决定,默认为 0.4 (40%)。它用于存储用户自己定义的数据结构、UDF 中创建的对象,以及 Spark 内部的一些元数据。如果你的 UDF 创建了大量大的对象,就可能会耗尽这部分内存,导致 OOM。
预留内存 (Reserved Memory): 一个固定大小(硬编码为 300MB)的内存区域,用于存储 Spark 内部系统对象,防止它们被用户代码意外地 OOM 掉。
堆外内存 (Off-Heap Memory): 由 spark.memory.offHeap.enabled 和 spark.memory.offHeap.size 控制。如果启用,Spark 可以直接在 JVM 堆之外管理一部分内存。
优点:
减少 GC 开销: 堆外内存不受 JVM 垃圾回收(Garbage Collection, GC)的影响。对于需要大量、长生命周期内存缓存的场景,可以避免长时间的 GC 停顿(“Stop-the-World” GC)。
精确控制: 允许 Spark 的 Tungsten 引擎像 C++ 一样直接操作内存,性能更高。
缺点: 使用和调试更复杂。通常只在对 GC 停顿非常敏感的、极致性能调优的场景下使用。
一个完整的 Executor 内存计算 (YARN 模式下):
当你在 YARN 上运行 Spark 时,你通过 --executor-memory (对应 spark.executor.memory) 指定的是 JVM 堆内存。但 YARN 容器实际占用的物理内存会比这个值大。YARN 通过 spark.executor.memoryOverhead 参数来为 JVM 之外的开销(如 Python 进程、系统库、堆外内存等)申请额外的“开销内存”。
YARN 容器总内存 = spark.executor.memory + spark.executor.memoryOverhead
memoryOverhead 的默认值是 max(executorMemory * 0.10, 384MB)。这是一个非常重要的参数,如果你的 PySpark 作业使用了大量的 Pandas UDF 或其他 Python 库,默认的开销内存可能不够用,导致 YARN 因为容器超出了物理内存限制而直接杀死它(Container killed by YARN for exceeding memory limits)。
6.2 OOM (OutOfMemoryError) 的根源与诊断
OOM 是 Spark 作业中最常见的失败原因。但 OOM 并非只有一种,其根源可能出现在 Driver 端,也可能出现在 Executor 端。
1. Executor OOM:
这是最常见的 OOM 类型。
原因:
不合理的资源分配: spark.executor.memory 设置得太小。
单个分区数据量过大: repartition 或 spark.sql.shuffle.partitions 设置得太小,导致单个 Task 需要处理的数据量超过了 Executor 的执行内存。
数据倾斜: 某个 Task 处理了远超其他 Task 的数据量。
collect() 大数据: 在代码中对一个巨大的 DataFrame 调用了 collect(),试图将所有数据拉回 Driver,但在此之前,Executor 需要先将自己分区的数据物化到内存中,这可能导致 OOM。
高并发的 join 或 groupBy: 复杂的聚合或连接操作需要在内存中维护大型的哈希表或缓冲区。
不当的 cache(): 将一个巨大的 DataFrame cache() 到 MEMORY_ONLY,但 Executor 内存不足以容纳。
UDF 内存泄漏: Python UDF 中创建了大量对象,耗尽了用户内存(User Memory)或堆外内存(如果 Python 进程与 JVM 共享)。
诊断 (在 Executor 日志中寻找线索):
java.lang.OutOfMemoryError: Java heap space: 最直接的信号,JVM 堆内存耗尽。通常是执行内存或存储内存不足。
java.lang.OutOfMemoryError: GC overhead limit exceeded: JVM 花了超过 98% 的时间在 GC,但只回收了不到 2% 的内存。这表明堆里全是“活”对象,内存严重不足。
Container killed by YARN for exceeding memory limits: YARN 发现容器占用的物理内存(堆内存 + 开销内存)超出了申请的上限,于是强行杀掉了它。这通常是堆外内存或 Python 进程内存使用过多导致的,需要调高 spark.executor.memoryOverhead。
2. Driver OOM:
虽然不如 Executor OOM 常见,但一旦发生,通常更难排查。
原因:
collect() 大数据: 这是 Driver OOM 的头号元凶。df.collect() 会将集群中所有 Executor 上的所有数据都汇总到 Driver 的内存中。如果结果集有几十 GB,而 Driver 只有几个 GB 的内存,必然会导致 OOM。
broadcast() 大表: broadcast(df) 会先将 df 的所有数据收集到 Driver,然后再分发出去。如果广播的表过大,Driver 会 OOM。
维护过大的元数据: 如果 Spark 应用的 DAG 极其复杂,或者需要维护大量的广播变量,也可能给 Driver 带来内存压力。
创建巨大的本地对象: 在 Driver 端的 Python 代码中创建了非常大的对象,例如,sc.parallelize(very_large_python_list)。
诊断 (在 Driver 日志中寻找线索):
同样会看到 java.lang.OutOfMemoryError: Java heap space。关键是看日志的来源是 Driver 还是 Executor。
6.3 核心配置参数与调优实践
资源调优是一个“自顶向下”的过程:先确定总资源,再分配给 Executor,最后微调内部内存划分。
假设集群信息:
10 个工作节点(Worker Nodes)
每个节点:16 个 CPU 核心,64 GB 内存
第一步:确定 Executor 的数量和核数
--num-executors: 你希望启动多少个 Executor 进程。
--executor-cores: 每个 Executor 分配多少个 CPU 核心。
YARN 的资源预留: 每个工作节点都需要为操作系统和 Hadoop/YARN 守护进程(如 NodeManager, DataNode)预留一部分资源。通常预留 1 个核心 和 10-15% 的内存。
可用核心/节点 = 16 – 1 = 15
可用内存/节点 = 64GB * 0.85 ≈ 54GB
策略一:胖 Executor (Fat Executors)
思路: 给每个 Executor 分配尽可能多的核心,减少 Executor 的数量。
配置: --executor-cores 5 (一个 Executor 占用 5 个核心)。
计算:
每个节点可运行的 Executor 数 = 15 核心 / 5 核心/Executor = 3 个。
总 Executor 数 (--num-executors) = 10 节点 * 3 Executors/节点 = 30 个。
每个 Executor 可分配的内存 = 54GB / 3 Executors ≈ 18GB。
优点: 减少了 Executor 间的通信开销,因为更多的数据可以在同一个 JVM 内部共享(通过广播变量等)。
缺点: 如果 Executor 因为 GC 而暂停,会同时暂停 5 个 Task。对 HDFS 的并行读写能力可能受限。
策略二:瘦 Executor (Thin Executors)
思路: 每个 Executor 只分配少量核心(甚至 1 个),增加 Executor 的数量。这是一种过时的策略,因为 Executor 间通信和管理开销会很大。
推荐策略 (平衡策略):
根据 HDFS 吞吐量和经验,通常为每个 Executor 分配 4-5 个核心是一个很好的起点。 这在 CPU 计算效率和 HDFS I/O 吞吐量之间取得了良好的平衡。
所以,我们采用胖 Executor策略的计算结果。
第二步:确定 Executor 的内存
--executor-memory: 设置每个 Executor 的堆内存。
根据上一步的计算,每个 Executor 的可用物理内存是 18GB。我们不能把这 18GB 全部分给 --executor-memory,因为还需要 memoryOverhead。
计算 memoryOverhead = 18GB * 10% = 1.8GB。
计算 --executor-memory = 18GB - 1.8GB ≈ 16GB。
第三步:确定 Driver 的资源
--driver-memory: Driver 的内存大小。
--driver-cores: Driver 的核心数。
指南:
Driver 内存通常不需要像 Executor 那么大,除非你预见到会有 collect 或 broadcast 操作。4GB 到 8GB 通常是一个安全的起点。
Driver 核心数通常设置为 1 就足够了,除非 Driver 本身需要进行大量的并行计算(非常罕见)。
第四步:整合为 spark-submit 命令
spark-submit
--master yarn
--deploy-mode cluster # 'cluster' 模式下 Driver 运行在集群中,'client' 模式下 Driver 运行在提交节点
--num-executors 30
--executor-cores 5
--executor-memory 16G
--driver-memory 4G
--conf "spark.sql.shuffle.partitions=600" # 例如,总核心数的 2-3 倍 (30*5*2=300, 30*5*3=450, 600是一个更大的值)
--conf "spark.default.parallelism=600" # 设置 RDD 操作的默认并行度
--conf "spark.memory.fraction=0.75" # 如果作业是计算密集型,可以适当增加 Spark Memory 比例
my_pyspark_script.py
动态资源分配 (Dynamic Resource Allocation)
对于在共享集群上运行的、负载不均的长时间作业,动态资源分配是一个非常有用的功能。
启用: --conf spark.dynamicAllocation.enabled=true
配合设置: --conf spark.shuffle.service.enabled=true (需要在 YARN 上配置)
工作方式: Spark 会根据作业负载自动地增加或减少 Executor 的数量。当任务队列变长时,它会申请新的 Executor;当 Executor 空闲一段时间后(由 spark.dynamicAllocation.executorIdleTimeout 控制),它会被释放。
优点: 极大地提升了集群资源的利用率。
缺点: 对于短作业,申请和释放 Executor 的延迟可能会抵消其优势。
内存和执行器调优是一个需要结合理论知识和反复实验的迭代过程。从一个合理的基准配置开始,通过观察 Spark UI 和日志来诊断瓶颈,然后有针对性地调整参数,是通往高性能 PySpark 作业的必由之路。
第六部分:实战演练 – 构建端到端数据分析管道
第七章:案例研究(一):电商用户行为分析
在这个案例中,我们将扮演一家电商公司的数据分析师。我们的任务是分析用户的行为日志,以挖掘有价值的业务洞察,例如识别高价值用户、分析商品的热度和用户的购买路径等。
数据集设定:
我们将使用三个模拟的数据集:
用户行为日志 (user_behavior.json): 一个包含用户在 App 或网站上各种行为记录的 JSON 文件。每条记录代表一次用户行为。
user_id: 用户 ID (string)
event_time: 事件发生的时间戳 (string, ISO 8601 格式)
event_type: 事件类型 (string),如 view (浏览商品), add_to_cart (加入购物车), purchase (购买), search (搜索)
product_id: 商品 ID (string)
category_id: 商品类目 ID (string)
price: 商品价格 (double, 只在 purchase 事件中非空)
session_id: 会话 ID (string)
商品信息表 (products.csv): 包含商品详细信息的 CSV 文件。
product_id: 商品 ID (string)
product_name: 商品名称 (string)
brand: 品牌 (string)
用户信息表 (users.parquet): 包含用户注册信息的高效 Parquet 文件。
user_id: 用户 ID (string)
reg_time: 注册时间 (string, ISO 8601 格式)
channel: 注册渠道 (string), 如 App, Web, WeChat
7.1 阶段一:数据加载与初步探索(ETL – Extract)
数据分析的第一步总是从加载数据开始。我们需要从不同的数据源(JSON, CSV, Parquet)将数据读取为 Spark DataFrame,并进行初步的探索性数据分析(EDA),以了解数据的基本情况,如模式、数据量和基本统计信息。
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, LongType
# --- 1. 初始化 SparkSession ---
# 在实际项目中,通常会配置更多的参数,但这里为了演示保持简洁
spark = SparkSession.builder
.appName("ECommerceAnalysis-ETL")
.config("spark.sql.shuffle.partitions", "50")
.config("spark.sql.session.timeZone", "UTC") # 统一时区为 UTC,避免时区问题
.getOrCreate()
# --- 2. 模拟生成数据文件 (在真实项目中,这些文件已存在于 HDFS 或 S3) ---
def setup_mock_data():
if not os.path.exists("data"):
os.makedirs("data")
# User behavior data (JSON)
with open("data/user_behavior.json", "w") as f:
f.write('{"user_id": "u001", "event_time": "2023-11-01T10:00:00Z", "event_type": "view", "product_id": "p001", "category_id": "c01"}
')
f.write('{"user_id": "u002", "event_time": "2023-11-01T10:01:00Z", "event_type": "view", "product_id": "p002", "category_id": "c02"}
')
f.write('{"user_id": "u001", "event_time": "2023-11-01T10:02:00Z", "event_type": "add_to_cart", "product_id": "p001", "category_id": "c01"}
')
f.write('{"user_id": "u001", "event_time": "2023-11-01T10:05:00Z", "event_type": "purchase", "product_id": "p001", "category_id": "c01", "price": 99.99}
')
f.write('{"user_id": "u003", "event_time": "2023-11-01T10:06:00Z", "event_type": "view", "product_id": "p003", "category_id": "c01"}
')
f.write('{"user_id": "u002", "event_time": "2023-11-01T10:07:00Z", "event_type": "add_to_cart", "product_id": "p002", "category_id": "c02"}
')
f.write('{"user_id": "u002", "event_time": "2023-11-01T10:10:00Z", "event_type": "purchase", "product_id": "p002", "category_id": "c02", "price": 149.50}
')
f.write('{"user_id": "u001", "event_time": "2023-11-02T11:00:00Z", "event_type": "view", "product_id": "p004", "category_id": "c03"}
')
# Product info (CSV)
with open("data/products.csv", "w") as f:
f.write("product_id,product_name,brand
")
f.write("p001,Quantum Laptop,TechCorp
")
f.write("p002,Galaxy Smartphone,ConnectX
")
f.write("p003,Silent Mouse,TechCorp
")
f.write("p004,Ergo Keyboard,Clicky
")
# User info (Parquet)
users_data = [("u001", "2023-01-15T08:00:00Z", "App"), ("u002", "2023-03-20T12:30:00Z", "Web"), ("u003", "2023-05-10T18:00:00Z", "App")]
users_df = spark.createDataFrame(users_data, ["user_id", "reg_time", "channel"])
users_df.write.mode("overwrite").parquet("data/users.parquet")
setup_mock_data()
print("模拟数据已生成。")
# --- 3. 加载数据 ---
# 对于生产环境,手动定义 Schema 是最佳实践,可以避免 inferSchema 带来的额外开销和潜在的类型错误
behavior_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_time", StringType(), True), # 先读作字符串,后续再转换
StructField("event_type", StringType(), True),
StructField("product_id", StringType(), True),
StructField("category_id", StringType(), True),
StructField("price", DoubleType(), True),
StructField("session_id", StringType(), True) # 假设 JSON 中有此字段,但我们的示例中没有
])
# 读取 JSON 文件
# JSON Lines 格式 (每行一个完整的 JSON 对象)
behavior_df = spark.read.schema(behavior_schema).json("data/user_behavior.json")
# 读取 CSV 文件
products_df = spark.read.option("header", "true").csv("data/products.csv")
# 读取 Parquet 文件
# Parquet 是自描述的,文件本身包含了 Schema,所以通常不需要手动指定
users_df = spark.read.parquet("data/users.parquet")
print("
--- 数据加载完成 ---")
# --- 4. 初步探索性数据分析 (EDA) ---
print("
--- 用户行为日志 (behavior_df) ---")
print(f"记录数: {
behavior_df.count()}") # count() 是一个 action,触发一次作业
behavior_df.printSchema() # 打印 Schema
behavior_df.show(5, truncate=False) # show() 也是一个 action
print("
--- 商品信息 (products_df) ---")
print(f"记录数: {
products_df.count()}")
products_df.printSchema()
products_df.show(5, truncate=False)
print("
--- 用户信息 (users_df) ---")
print(f"记录数: {
users_df.count()}")
users_df.printSchema()
users_df.show(5, truncate=False)
# 查看基本统计信息
print("
--- 行为日志中的数值列基本统计 (describe) ---")
behavior_df.describe(['price']).show()
// describe() 会计算 count, mean, stddev, min, max 等统计量
# 查看不同事件类型的分布
print("
--- 事件类型分布 (groupBy + count) ---")
behavior_df.groupBy('event_type').count().orderBy(F.col('count').desc()).show()
EDA 阶段的发现与思考:
behavior_df 中的 event_time 被读取为 StringType,我们需要在下一步将其转换为 TimestampType 以便进行时间维度的分析。
price 列只在 purchase 事件中存在,其他事件类型中为 null。
products_df 的列类型都被自动推断为 string,这对于当前数据是合适的。
数据量目前很小,但在真实场景中,count() 操作可能会很慢,我们需要意识到它是一个会触发全量扫描的 Action。
事件类型分布显示 view 事件最多,符合一般用户行为模式。
这个阶段的目标是建立对数据的基本认知,为后续的数据清洗和转换做好准备。
7.2 阶段二:数据清洗与转换(ETL – Transform)
原始数据往往是“脏”的,不适合直接用于分析。这个阶段,我们的核心任务是:
处理不一致的数据类型(如将字符串转为时间戳)。
处理缺失值。
创建新的派生列(特征工程)。
将多个 DataFrame 连接起来,形成一个统一的、宽格式的分析基础表。
# --- 1. 数据类型转换与列清理 ---
print("
--- 开始数据清洗与转换 ---")
# 对 behavior_df 进行处理
behavior_df_cleaned = behavior_df
.withColumn("event_ts", F.to_timestamp(F.col("event_time")))
.withColumn("event_date", F.to_date(F.col("event_ts")))
.drop("event_time") # 创建了新列后,可以丢弃掉旧的字符串列
# 对 users_df 进行处理
users_df_cleaned = users_df.withColumn("reg_ts", F.to_timestamp(F.col("reg_time")))
.drop("reg_time")
print("--- 清洗后的 behavior_df ---")
behavior_df_cleaned.printSchema()
behavior_df_cleaned.show(5, truncate=False)
# --- 2. 处理缺失值 ---
# 在我们的场景中,price 列的 null 是有业务含义的(非购买事件),暂时不填充。
# 我们需要处理 user_id 或 product_id 为 null 的情况,这里假设它们是脏数据。
behavior_df_cleaned = behavior_df_cleaned.na.drop(subset=["user_id", "product_id"])
print(f"
删除关键ID为null的记录后,行为日志剩余: {
behavior_df_cleaned.count()} 条")
# --- 3. 连接 DataFrame,构建统一视图 ---
# 为了得到一个包含所有信息的“大宽表”,我们将三个 DataFrame 连接起来。
# 这是分析的核心步骤。
# 步骤 3a: 将用户行为与用户信息连接起来
# 使用 left join,确保即使某个用户在 users_df 中没有信息(理论上不应该发生),其行为记录也不会丢失。
enriched_behavior_df = behavior_df_cleaned.join(
users_df_cleaned,
on="user_id",
how="left"
)
# 步骤 3b: 将上面的结果再与商品信息连接起来
# 同样使用 left join,保证即使商品信息缺失,行为记录仍在
master_df = enriched_behavior_df.join(
products_df,
on="product_id",
how="left"
)
# 连接后,最好进行一次缓存,因为这个 master_df 会被后续的多个分析任务反复使用
master_df.cache()
# 触发一次 action 来填充缓存
master_df.count()
print("
--- 构建完成的统一分析宽表 (master_df) ---")
master_df.printSchema()
master_df.show(5, truncate=False)
# --- 4. 派生新特征 (Feature Engineering) ---
# 这些新特征对于后续分析非常有价值
# 4a. 用户生命周期:用户从注册到本次事件发生所经过的天数
master_df = master_df.withColumn("days_since_reg", F.datediff(F.col("event_date"), F.to_date(F.col("reg_ts"))))
# 4b. 时间特征:从时间戳中提取出小时、星期几等信息
master_df = master_df.withColumn("event_hour", F.hour(F.col("event_ts")))
.withColumn("event_day_of_week", F.dayofweek(F.col("event_ts"))) # 1=周日, 2=周一, ...
print("
--- 添加了派生特征后的 master_df ---")
master_df.select(
"user_id", "event_type", "event_ts", "reg_ts",
"days_since_reg", "event_hour", "event_day_of_week"
).show(5)
# 至此,我们得到了一个干净、统一、信息丰富的 master_df。
# 它是后续所有分析的基础。
转换阶段的决策与考量:
连接策略: 在 join 操作中,我们都选择了 left join,以行为日志 behavior_df 作为主表。这是为了保证分析的完整性,即使用户或商品信息缺失,我们也不会丢失一次用户行为。
缓存(Caching): master_df 是通过两次 join 得到的,计算成本相对较高。由于后续的多个分析任务都会基于这个 DataFrame 进行,所以在其构建完成后立即 cache() 是一个非常重要的性能优化步骤。这避免了每次分析都要重复执行 join 操作。
特征工程: 我们派生了 days_since_reg, event_hour 等新特征。这些特征将用户行为置于了更丰富的上下文中(用户生命周期、时间维度),使得分析可以更加深入。例如,我们可以分析新老用户的行为差异,或者分析一天中不同时段的购买高峰。
7.3 阶段三:聚合分析与洞察挖掘(ETL – Load / Analysis)
有了干净的 master_df,现在我们可以开始回答具体的业务问题了。这个阶段的核心是使用 groupBy, agg, 窗口函数等工具进行聚合分析。
分析任务 1:核心业务指标(KPI)计算
我们需要计算一些宏观的业务指标,如日活跃用户(DAU)、日新增用户(DNU)、付费用户数(DPU)、总收入(Revenue)和客单价(AOV)。
# --- 分析任务 1: 计算每日核心 KPI ---
print("
--- 开始计算每日核心 KPI ---")
# 使用 groupBy 和多个聚合函数
daily_kpi = master_df.groupBy("event_date").agg(
F.countDistinct("user_id").alias("dau"), # countDistinct 计算独立用户数,即 DAU
F.sum(F.when(F.col("event_type") == "purchase", F.col("price")).otherwise(0)).alias("total_revenue"), # 只计算购买事件的价格总和
F.countDistinct(F.when(F.col("event_type") == "purchase", F.col("user_id"))).alias("dpu") # 计算独立付费用户数
).withColumn("aov", F.col("total_revenue") / F.col("dpu")) # 计算客单价 (Average Order Value)
# 计算日新增用户 (DNU)
# DNU 需要单独从 users_df 计算
daily_new_users = users_df_cleaned.withColumn("reg_date", F.to_date(F.col("reg_ts")))
.groupBy("reg_date")
.agg(F.countDistinct("user_id").alias("dnu"))
# 将 KPI 指标 join 在一起
final_kpi_df = daily_kpi.join(daily_new_users, daily_kpi.event_date == daily_new_users.reg_date, "left")
.select("event_date", "dau", "dnu", "dpu", "total_revenue", "aov")
.orderBy("event_date")
print("
--- 每日核心 KPI 指标 ---")
final_kpi_df.show()
分析任务 2:高价值用户识别 (RFM 模型简化版)
RFM 模型是用户价值分析的经典模型,代表 Recency(最近一次消费)、Frequency(消费频率)、Monetary(消费金额)。我们这里做一个简化,计算每个用户的总购买次数和总购买金额。
# --- 分析任务 2: 识别高价值用户 ---
print("
--- 开始进行用户价值分析 ---")
user_value_df = master_df.filter(F.col("event_type") == "purchase")
.groupBy("user_id", "channel")
.agg(
F.count("product_id").alias("purchase_frequency"), # 购买次数
F.sum("price").alias("total_monetary"), # 总购买金额
F.max("event_ts").alias("last_purchase_time") # 最近一次购买时间
)
.orderBy(F.col("total_monetary").desc())
print("
--- 用户购买力排名 (Top 10) ---")
user_value_df.show(10)
通过这个分析,运营团队可以快速定位到那些消费总额和购买频率都高的核心用户,并对他们进行精准的营销或 VIP 服务。
分析任务 3:商品热度与关联分析
我们需要知道哪些商品最受欢迎,以及购买了某个商品的用户还可能对哪些其他商品感兴趣。
# --- 分析任务 3: 商品热度分析 ---
print("
--- 开始进行商品分析 ---")
# 3a. 计算每个商品的浏览量、加购量、购买量
product_metrics_df = master_df.groupBy("product_id", "product_name", "brand").pivot("event_type", ["view", "add_to_cart", "purchase"])
.count()
.na.fill(0) # pivot 后可能会产生 null,用 0 填充
# 计算转化率:购买量 / 浏览量
product_metrics_df = product_metrics_df.withColumn("conversion_rate", F.col("purchase") / F.col("view"))
print("
--- 商品核心指标 (按购买量排序) ---")
product_metrics_df.orderBy(F.col("purchase").desc()).show()
# 3b. 简单的购物篮关联分析:找出购买了 A 商品的用户还购买了哪些其他商品
# 这需要一次自连接 (self-join)
print("
--- 简单的商品关联分析 ---")
# 筛选出所有购买记录
purchases_df = master_df.filter(F.col("event_type") == 'purchase')
.select("user_id", "product_id", "product_name")
# 创建两个别名以便自连接
df1 = purchases_df.alias("df1")
df2 = purchases_df.alias("df2")
# 自连接条件:同一个用户,但商品不同
# 并且为了避免重复 (A,B) 和 (B,A),我们让 df1.product_id < df2.product_id
co_purchase_df = df1.join(df2,
(df1.user_id == df2.user_id) & (df1.product_id < df2.product_id)
).groupBy(df1.product_name, df2.product_name)
.count()
.orderBy(F.col("count").desc())
print("
--- 共同购买次数最高的商品组合 ---")
co_purchase_df.show()
pivot 函数在这里非常有用,它将 event_type 列的行值转换成了列,使得数据更加直观。而自连接则揭示了商品之间的关联性,为推荐系统提供了基础数据支持。
分析任务 4:用户行为路径分析
我们想了解用户从浏览到购买的典型路径。这里我们使用窗口函数来追踪一个会话(Session)内用户的事件顺序。
# --- 分析任务 4: 用户行为路径分析 ---
print("
--- 开始进行用户会话路径分析 ---")
# 假设 session_id 已存在或可以被生成(例如,如果用户30分钟内无操作则认为是一个新会话)
# 为了演示,我们假设同一个用户在同一天的行为属于一个会话
master_df_with_session = master_df.withColumn("session_id", F.concat_ws("_", F.col("user_id"), F.col("event_date")))
# 使用窗口函数,在每个会话内按时间排序事件
session_window = Window.partitionBy("session_id").orderBy("event_ts")
# 使用 lag() 函数获取上一个事件类型
path_df = master_df_with_session.withColumn("prev_event_type", F.lag("event_type").over(session_window))
# 筛选出有意义的路径,例如,从"加入购物车"到"购买"
cart_to_purchase_paths = path_df.filter(
(F.col("prev_event_type") == "add_to_cart") & (F.col("event_type") == "purchase")
)
print("
--- 成功从'加购'转化为'购买'的路径次数 ---")
print(cart_to_purchase_paths.count())
cart_to_purchase_paths.select("user_id", "session_id", "product_id", "event_ts").show()
这个端到端的案例完整地展示了如何将一个模糊的业务需求,通过 PySpark 强大的数据处理能力,一步步地转化为具体、可量化、有价值的业务洞察。它融合了数据加载、清洗、转换、连接、聚合和高级分析(窗口函数)等所有核心技术。
最后,将分析结果落地(Load)
在得到分析结果 DataFrame 后,最后一步通常是将其写入到一个持久化的存储中,供下游应用(如报表系统、BI 工具、推荐引擎)使用。
# 例如,将 KPI 结果写入到一个新的 Parquet 文件中
# Parquet 是用于存储分析结果的理想格式
final_kpi_df.write.mode("overwrite").parquet("data/results/daily_kpi.parquet")
# 将用户价值分析结果写入数据库,供运营人员查询
# user_value_df.write
# .format("jdbc")
# .option("url", "jdbc:postgresql:dbserver")
# .option("dbtable", "user_value_analysis")
# .option("user", "username")
# .option("password", "password")
# .save()
# 在所有操作完成后,释放缓存和停止会话
master_df.unpersist()
spark.stop()

















暂无评论内容