← 返回首页
Checkpoint
发表时间:2023-10-18 06:55:53
Checkpoint

Checkpoint,是Spark提供的一个比较高级的功能。可以简单理解为是一种快照的机制。

1.如何使用checkPoint

对于特别复杂的Spark任务,有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失,虽然之前持久化过,但是还是导致数据丢失了。那么也就是说,出现失败的时候,没有容错机制,所以当后面的transformation算子,又要使用到该RDD时,就会发现数据丢失了,此时如果没有进行容错处理的话,那么就需要再重新计算一次数据了。

何使用CheckPoint的步骤如下:

首先要调用SparkContext的setCheckpointDir()方法,设置一个容错的文件系统的目录,比如HDFS;然后,对RDD调用checkpoint()方法。

最后,在RDD所在的job运行结束之后,会启动一个单独的job,将checkpoint设置过的RDD的数据写入之前设置的文件系统中。

2.RDD之checkpoint流程

看下面这张图:

  1. SparkContext设置checkpoint目录,用于存放checkpoint的数据;
  2. 对RDD调用checkpoint方法,然后它就会被RDDCheckpointData对象进行管理,此时这个RDD的checkpoint状态会被设置为Initialized。
  3. 待RDD所在的job运行结束,会调用job中最后一个RDD的doCheckpoint方法,该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD,并将其checkpoint状态从Initialized设置为CheckpointingInProgress。
  4. 启动一个单独的job,来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作,也就是将其数据写入checkpoint目录。
  5. 将RDD数据写入checkpoint目录之后,会将RDD状态改变为Checkpointed;并且还会改变RDD的血缘关系,即会清除掉RDD所有依赖的RDD;
  6. 最后还会设置其父RDD为新创建的CheckpointRDD。

3.Checkpoint与持久化的区别

这里的checkpoint和我们之前讲的RDD持久化有什么区别吗?

建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY),因为默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,那么这个时候,本来Spark任务已经执行结束了,但是由于中间的RDD没有持久化,在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话,就需要重新计算这个RDD的数据,再将其checkpoint到外部存储系统中。如果对需要checkpoint的rdd进行了基于磁盘的持久化,那么后面进行checkpoint操作时,就会直接从磁盘上读取rdd的数据了,就不需要重新再计算一次了,这样效率就高了。

4.CheckPoint的使用

那我们来演示一下:将一个RDD的数据持久化到HDFS上面。

1).scala实现

package com.simoniu.scalademo

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:checkpoint的使用
 * Created by simoniu
 */

object CheckPointerScalaDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("CheckPointOpScala")
    val sc = new SparkContext(conf)
    if (args.length == 0) {
      System.exit(100)
    }
    val outputPath = args(0)

    //1:设置checkpint目录
    sc.setCheckpointDir("hdfs://master:9000/chk001")

    val dataRDD = sc.textFile("hdfs://master:9000/test/hello_10000000.dat")
    //2:对rdd执行checkpoint操作
    dataRDD.checkpoint()
    dataRDD.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .saveAsTextFile(outputPath)

    sc.stop()

  }
}

2).Java实现

package com.simoniu.sparkdemo.javademo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * 需求:checkpoint的使用
 * Created by simoniu
 */
public class CheckPointerJavaDemo {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("CheckPointOpJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        if (args.length == 0) {
            System.exit(100);
        }

        String outputPath = args[0];

        //1:设置checkpint目录
        sc.setCheckpointDir("hdfs://master:9000/chk002");

        JavaRDD<String> dataRDD = sc.textFile("hdfs://master:9000/test/hello_10000000.dat");
        //2: 对rdd执行checkpoint操作
        dataRDD.checkpoint();

        dataRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        }).saveAsTextFile(outputPath);

        sc.stop();

    }
}

测试运行,先确保hadoop集群是正常运行的,以及hadoop中的historyserver进程和spark的historyserver进程也是正常运行的。

启动Hadoop的History Server:

cd $HADOOP_HOME
bin/mapred --daemon start historyserver

启动Spark的History Server:

cd $SPARK_HOME
sbin/start-history-server.sh

master主机进程如下:

[root@master spark]# jps
72656 SecondaryNameNode
73584 ResourceManager
34455 HistoryServer
119272 JobHistoryServer
47292 Jps
72031 NameNode

项目打包后上传到/root/examples下。编写运行脚本。

wordCountJob.sh

spark-submit \
--class com.simoniu.scalademo.CheckPointerScalaDemo  \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
sparksubmitdemo.jar /out-chk001

提交任务。

[root@master spark]# sh -x wordCountJob.sh

执行成功之后可以到setCheckpointDir指定的目录中查看一下,可以看到目录中会生成对应的文件保存rdd中的数据,只不过生成的文件不是普通文本文件,直接查看文件中的内容显示为乱码。

[root@master spark]# hdfs dfs -ls /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1
Found 14 items
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:27 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00000
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:28 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00001
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:29 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00002
-rw-r--r--   2 root supergroup  134946420 2023-10-18 16:30 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00003
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:31 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00004
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:32 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00005
-rw-r--r--   2 root supergroup  134946420 2023-10-18 16:32 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00006
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:33 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00007
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:34 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00008
-rw-r--r--   2 root supergroup  134946420 2023-10-18 16:35 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00009
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:35 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00010
-rw-r--r--   2 root supergroup  134946607 2023-10-18 16:36 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00011
-rw-r--r--   2 root supergroup  134946420 2023-10-18 16:37 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00012
-rw-r--r--   2 root supergroup  115894912 2023-10-18 16:38 /chk001/c5e3074b-7fde-4a54-87c8-5b3e27d9472e/rdd-1/part-00013