需要使用缓存的场景
在使用 PySpark 的时候,经常会遇到如下场景:
- 存在一个经过复杂计算得到的 DataFrame,这个 DF 会在后续多次计算&使用,每次都会耗费我们的大量时间。
Spark 采用了 DAG 的计算流,直到一个实际的 Action 时才会真的发生运算,这在实际生产环境中,非常方便,由于其可以自动的对 DAG 进行优化,能够高效地促进计算效率。
但是在调试、测试中,就比较麻烦,或者在需要我们经常中途计算,输出一些内容时,就很麻烦,因为它重头开始计算太耗时了(会依据谱系图,重头计算)。
这时候,就比较适合使用 df.cache()
来对数据进行缓存,因为如果对 df 进行缓存,Spark 内部的调度程序,就可以截短 RDD 图的谱系
...在这种情况下,Spark 可以 ‘短路’,只根据已持久化的 RDD 开始计算。
除了 df.cache()
以为,还可以运用的方法有 df.persist()
关于存储级别
在 2.4.4 版本中,df.cache() && df.persist()
的默认存储级别为 MEMORY_AND_DISK
即将 DF 存储在内存中,内存中放不下的分区将被存储在磁盘上。
这块需要注意的点在于, df.cache()
本身也是一个 transform 而不是一个 action,即意味着它并不会立即开始运算,也不会立即生效,必须得再进行一次计算才可以。
df.persist()
区别于 df.cache()
还可以应用其他的存储级别,包括:
- MEMORY_ONLY 仅存储在内存中
- MEMORY_AND_DISK 优先存储在内存中,存储不下的分区放到磁盘上
- MEMORY_AND_SER 存储在内存中,但作为序列化的 JAVA 对象(在 PySpark 中应该是 Pickle 对象)
- MEMORY_AND_DISK_SER 作为序列化的 JAVA 对象存储在内存和磁盘中
- DISK_ONLY 存储在磁盘上
- ...
对于缓存的数据,如果缓存过多,导致内存占用较大,有的时候也比较头疼,这块就可以考虑对一些临时性的缓存进行释放,释放可以采用 df.unpersist()
以取消持久化。
关于 cache & persist 和 checkpoint 的区别
简单来说,cache 和 persist 是临时性的缓存,且虽然做了 cache 进行了短路,但是其 血缘
并没有忘记(一个简单的例子是,如果你后面操作除了错,会有一大堆报错提示)
而 chechpoint 则对 血缘
进行了完全的截断,并且持久化到了本地,是一直存在的,甚至可以被其他程序使用,这块有很大的区别,所以相对来说,还是慎用 checkpoint 比较好。