← 返回首页
Transformation和Action
发表时间:2023-10-09 15:05:44
Transformation和Action

Spark对RDD的操作可以整体分为两类:Transformation和Action。

1.Transformation和Action

Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等。 Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序。不论是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子,例如:map算子、reduce算子。

其中Transformation算子有一个特性:lazy lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。

Action的特性:执行Action操作才会触发一个Spark 任务的运行,从而触发这个Action之前所有的Transformation的执行。

以我们的WordCount代码为例来分析Transformation和Action。

    //第一步:创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("WordCountScala")//设置任务名称
    //.setMaster("local")//local表示在本地执行
    val sc = new SparkContext(conf)

    //第二步:加载数据
    var path = "D:\\uploadFiles\\hello.txt"
    if(args.length==1){
      path = args(0)
    }

   //这里通过textFile()方法,针对外部文件创建了一个RDD,linesRDD,实际上,程序执行到这里为止, 
    hello.txt文件的数据是不会被加载到内存中的。linesRDD只是代表了一个指向hello.txt文件的引用
    val linesRDD = sc.textFile(path)

    //第三步:对数据进行切割,把一行数据切分成一个一个的单词
    //这里通过flatMap算子对linesRDD进行了转换操作,把每一行数据中的单词切开,获取了一个转换后的wordsRDD,但是由于linesRDD目前是没有数据的,现在不会做任何操作,只是进行了逻辑上的定义而已,最终生成的wordsRDD也只是一个逻辑上的RDD,此时里面并没有任何数据
    val wordsRDD = linesRDD.flatMap(_.split(" "))

    //第四步:迭代words,将每个word转化为(word,1)这种形式
    //这个操作和前面分析的flatMap的操作是一样的,最终获取了一个逻辑上的pairRDD,此时里面也是没有任何数据的
    val pairRDD = wordsRDD.map((_,1))

    //第五步:根据key(其实就是word)进行分组聚合统计
    //这个操作也是和前面分析的flatMap操作是一样的,最终获取了一个逻辑上的wordCountRDD,此时里面也是没有任何数据的
    val wordCountRDD = pairRDD.reduceByKey(_ + _)

    //第六步:将结果打印到控制台
    //这行代码执行了一个action操作,foreach,此时会触发之前所有transformation算子的执行,Spark会将这些算子拆分成多个task发送到多个机器上并行执行,这个foreach算子是没有返回值的,所以不会向Driver进程返回数据,如果是reduce操作,则会向Driver进程返回最终的结果数据。
    //注意:只有当任务执行到这一行代码的时候任务才会真正开始执行计算,如果任务中没有这一行代码,前面的所有算子是不会执行的
    wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))

    //第七步:停止SparkContext
    sc.stop()