Spark 缓存与持久化


需要使用缓存的场景

在使用 PySpark 的时候,经常会遇到如下场景:

  • 存在一个经过复杂计算得到的 DataFrame,这个 DF 会在后续多次计算&使用,每次都会耗费我们的大量时间。

Spark 采用了 DAG 的计算流,直到一个实际的 Action 时才会真的发生运算,这在实际生产环境中,非常方便,由于其可以自动的对 DAG 进行优化,能够高效地促进计算效率。

但是在调试、测试中,就比较麻烦,或者在需要我们经常中途计算,输出一些内容时,就很麻烦,因为它重头开始计算太耗时了(会依据谱系图,重头计算)。

这时候,就比较适合使用 df.cache() 来对数据进行缓存,因为如果对 df 进行缓存,Spark 内部的调度程序,就可以截短 RDD 图的谱系

...在这种情况下,Spark 可以 ‘短路’,只根据已持久化的 RDD 开始计算。

除了 df.cache() 以为,还可以运用的方法有 df.persist()

关于存储级别

在 2.4.4 版本中,df.cache() && df.persist() 的默认存储级别为 MEMORY_AND_DISK 即将 DF 存储在内存中,内存中放不下的分区将被存储在磁盘上。

这块需要注意的点在于, df.cache() 本身也是一个 transform 而不是一个 action,即意味着它并不会立即开始运算,也不会立即生效,必须得再进行一次计算才可以。

df.persist() 区别于 df.cache() 还可以应用其他的存储级别,包括:

  • MEMORY_ONLY 仅存储在内存中
  • MEMORY_AND_DISK 优先存储在内存中,存储不下的分区放到磁盘上
  • MEMORY_AND_SER 存储在内存中,但作为序列化的 JAVA 对象(在 PySpark 中应该是 Pickle 对象)
  • MEMORY_AND_DISK_SER 作为序列化的 JAVA 对象存储在内存和磁盘中
  • DISK_ONLY 存储在磁盘上
  • ...

对于缓存的数据,如果缓存过多,导致内存占用较大,有的时候也比较头疼,这块就可以考虑对一些临时性的缓存进行释放,释放可以采用 df.unpersist() 以取消持久化。

关于 cache & persist 和 checkpoint 的区别

简单来说,cache 和 persist 是临时性的缓存,且虽然做了 cache 进行了短路,但是其 血缘 并没有忘记(一个简单的例子是,如果你后面操作除了错,会有一大堆报错提示)

而 chechpoint 则对 血缘 进行了完全的截断,并且持久化到了本地,是一直存在的,甚至可以被其他程序使用,这块有很大的区别,所以相对来说,还是慎用 checkpoint 比较好。