Spark提供了一种将RDD持久化的方式(cache、persist),这种方式适用于需要多次执行action操作的RDD,因为持久化之后的RDD中的内容不需要重新计算,可以直接使用,对于多次执行action的RDD来说,这样做能省下许多重复计算的时间。Task在启动之初读取一个分区的时候,会先判断这个分区是否已经被持久化,如果没有则会再去检查是否存在Checkpoint,还没有找到的话会根据血统重新计算。RDD的缓存是一种特殊的持久化操作,即RDD.cache()
等同于RDD.persist(MEMORY_ONLY)
即缓存是一种只将RDD持久化到内存当中的方式。本文基于Spark 2.1版本的源码对RDD的缓存过程进行了分析,文章中涉及到的源码文件主要有以下几个:
spark/core/src/main/scala/org/apache/spark/storage/memory/MemoryStorage.scala
spark/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
spark/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
spark/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
RDD缓存分析
RDD在缓存到内存之前,Partition中的数据一般以迭代器(Iterator)的数据结构来访问,通过Iterator可以获得分区中每一条序列化或者非序列化的Record,这些Record在访问的时候占用的是JVM堆内存中other部分的内存区域,同一个Partition的不同Record的空间并不是连续的。RDD被缓存之后,会由Partition转化为Block,并且存储位置变为了Storage Memory区域,并且此时Block中的Record所占用的内存空间是连续的。我们可以在Spark的源码当中多次看到unroll这个词,字面意思是展开,在Spark当中的意义就是将存储在Partition中的Record由不连续的存储空间转换为连续存储空间的过程。Unroll操作的时候需要在Storage Memory当中通过reserveUnrollMemoryForThisTask
来申请Unroll操作所需要的内存,使用完毕之后,又通过releaseUnrollMemoryForThisTask
方法来释放这部分内存。这与1.6.0版本之前固定Unroll内存的方式不同,是动态申请的,因为这部分内存只在Unroll的时候有用,动态申请这块内存能够在不需要Unroll的时候将这块内存区域用于其他的用途上,提升内存资源的利用率。Block有两种存储方式,分别为序列化存储和非序列化存储,这两种存储方式具有其对应的Entry,在MemoryStore类中通过一个LinkedHashMap
来存储堆内和对外内存中的所有Block对象的实例:
1 | // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and |
通过这段源码的注释我们也可以知道,对这个map的数据结构进行操作的时候需要严格遵循同步的原则,因为一个Executor会对应一个MemoryStore,而一个Executor有多个core的时候会并行执行Task,就会有多个线程共享使用一块Storage Memory,即共享使用这一个LinkedHashMap,修改LinkedHashMap时需要做到同步。
因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。至于为何要选择LinkedHashMap来存储也是有原因的,因为LinkedHashMap能够很好地支持LRU算法(最近最少使用,常用于页面置换算法),我们可以看到定义LinkedHashMap的第三个参数accessOrder=true
,即基于访问顺序,被访问到的元素会被加到LinkedHashMap的最后。基于这个特性,当新Block加入的时候发现内存空间不足的时候,会按照最近最少使用的顺序淘汰LinkedHashMap中的Block。
序列化存储
序列化存储使用了一个名为SerializedMemoryEntry的case class:
1 | private case class SerializedMemoryEntry[T]( |
这里的主要存储结构为ChunkedByteBuffer
,实际上这个类是Spark自己实现的用于存储ByteBuffer
的数据结构,其本质为Array[ByteBuffer]
,Array的每一个元素被称为一个chunk。对于已经序列化的Partition在转化为Block进行存储时,因为在存储时就已经知道序列化的ByteBuffer的size,其所需要的Unroll空间可以直接累加计算,一次申请。存储所使用的方法为putBytes
,需要输入Block的ID、占用的内存空间大小、存储模式为堆内内存还是堆外内存以及存放序列化数据的ByteBuffer,其返回的内容指示了是否缓存成功:
1 | def putBytes[T: ClassTag]( |
获取序列化缓存的内容可以直接使用getBytes
方法,输入Block的ID,获取对应的ChunkByteBuffer。