← 返回首页
RDD持久化
发表时间:2023-10-14 14:53:55
RDD持久化

RDD持久化。

1.RDD持久化原理

Spark中有一个非常重要的功能就是可以对RDD进行持久化。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition数据持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存中缓存的partition数据。这样的话,针对一个RDD反复执行多个操作的场景,就只需要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。因为正常情况下这个RDD的数据使用过后内存中是不会一直保存的。

例如这样的操作:针对mapRDD需要多次使用的:

val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val mapRDD = dataRDD.map(...)
mapRDD.foreach(...)
mapRDD.saveAsTextFile(...)
mapRDD.collect()

巧妙使用RDD持久化,在某些场景下,对spark应用程序的性能有很大提升。特别是对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。要持久化一个RDD,只需要调用它的cache()或者persist()方法就可以了。

cache()和persist()的区别在于: cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,也就是调用persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,那么可以使用unpersist()方法。

2.RDD持久化策略

下面是目前Spark支持的一些持久化策略。

策略                  介绍
-------------------------------------------------------------------------------------------------
MEMORY_ONLY                 以非序列化的方式持久化在JVM内存中
MEMORY_AND_DISK                     同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中
MEMORY_ONLY_SER                     同MEMORY_ONLY,但是会序列化
MEMORY_AND_DISK_SER                 同MEMORY_AND_DSK,但是会序列化
DISK_ONLY                   以非序列化的方式完全存储到磁盘上
MEMORY_ONLY_2、MEMORY_AND_DISK_2等    尾部加了2的持久化级别,表示会将持久化数据复制一份,保存到其他节点

补充说明: - MEMORY_ONLY:以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。 - MEMORY_AND_DISK:当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取,不需要重新计算 - MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java的序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是在使用的时候需要进行反序列化,因此会增加CPU开销。 - MEMORY_AND_DISK_SER:同MEMORY_AND_DSK。但是会使用序列化方式持久化Java对象。 - DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。 - MEMORY_ONLY_2、MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复制一份,保存到其它节点,从而在数据丢失时,不需要重新计算,只需要使用备份数据即可。

3.如何选择RDD持久化策略

Spark提供了多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。

下面是一些通用的持久化级别的选择建议: 1. 优先使用MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗CPU进行反序列化操作,缺点就是比较耗内存。 2. MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化。

注意: 如果需要进行数据的快速失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。能不使用DISK相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。

4.使用RDD的持久化案例

1).scala实现

不使用cache统计两次RDD操作用时。

package com.simoniu.scalademo

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:RDD持久化
 * Created by simoniu
 */
object PersistRddScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("PersistRddScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    val dataRDD = sc.textFile("D:\\uploadFiles\\hello_10000000.dat")
    //val dataRDD = sc.textFile("D:\\uploadFiles\\hello_10000000.dat").cache()

    var start_time = System.currentTimeMillis()
    var count = dataRDD.count()
    println(count)
    var end_time = System.currentTimeMillis()
    println("第一次耗时:" + (end_time - start_time))

    start_time = System.currentTimeMillis()
    count = dataRDD.count()
    println(count)
    end_time = System.currentTimeMillis()
    println("第二次耗时:" + (end_time - start_time))

    sc.stop()
  }
}


10000000
第一次耗时:3903

10000000
第二次耗时:3626

使用cache之后。

package com.simoniu.scalademo

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:RDD持久化
 * Created by simoniu
 */
object PersistRddScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("PersistRddScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    //val dataRDD = sc.textFile("D:\\uploadFiles\\hello_10000000.dat")
    val dataRDD = sc.textFile("D:\\uploadFiles\\hello_10000000.dat").cache()

    var start_time = System.currentTimeMillis()
    var count = dataRDD.count()
    println(count)
    var end_time = System.currentTimeMillis()
    println("第一次耗时:" + (end_time - start_time))

    start_time = System.currentTimeMillis()
    count = dataRDD.count()
    println(count)
    end_time = System.currentTimeMillis()
    println("第二次耗时:" + (end_time - start_time))

    sc.stop()
  }
}

10000000
第一次耗时:8468

10000000
第二次耗时:220

加上cache之后,第二次计算耗时明显很少了很多。

2).Java实现

package com.simoniu.sparkdemo.javademo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * 需求:RDD持久化
 * Created by simoniu
 */
public class PersistRddJava {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("PersistRddJava")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> dataRDD = sc.textFile("D:\\uploadFiles\\hello_10000000.dat").cache();

        long start_time = System.currentTimeMillis();
        long count = dataRDD.count();
        System.out.println(count);
        long end_time = System.currentTimeMillis();
        System.out.println("第一次耗时:"+(end_time-start_time));

        start_time = System.currentTimeMillis();
        count = dataRDD.count();
        System.out.println(count);
        end_time = System.currentTimeMillis();
        System.out.println("第二次耗时:"+(end_time-start_time));

        sc.stop();
    }
}

10000000
第一次耗时:8756

10000000
第二次耗时:260