← 返回首页
常用Action
发表时间:2023-10-10 17:17:59
常用Action

常用Action。

1.常见的Action算子

常见的Action算子如下:

算子              介绍
reduce          将RDD中的所有元素进行聚合操作
collect         将RDD中所有元素获取到本地客户端(Driver)
count           获取RDD中元素总数
take(n)         获取RDD中前n个元素
saveAsTextFile          将RDD中元素保存到文件中,对每个元素调用toString
countByKey      对每个key对应的值进行count计数
foreach         遍历RDD中的每个元素

2.Scala实现

package com.simoniu.scalademo

import org.apache.spark.{SparkConf, SparkContext}

object ActionScalaDemo {

  /**
   * 需求:Action实战
   * reduce:聚合计算
   * collect:获取元素集合
   * take(n):获取前n个元素
   * count:获取元素总数
   * saveAsTextFile:保存文件
   * countByKey:统计相同的key出现多少次
   * foreach:迭代遍历元素
   *
   * Created by simoniu
   */

  def main(args: Array[String]): Unit = {
    val sc = getSparkContext
    //reduce:聚合计算
    reduceOp(sc)
    //collect:获取元素集合
    collectOp(sc)
    //take(n):获取前n个元素
    takeOp(sc)
    //count:获取元素总数
    countOp(sc)
    //saveAsTextFile:保存文件
    saveAsTextFileOp(sc)
    //countByKey:统计相同的key出现多少次
    countByKeyOp(sc)
    //foreach:迭代遍历元素
    foreachOp(sc)

    sc.stop()
  }

  def foreachOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    //注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的
    //实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach,
    //在里面实现具体向外部输出数据的代码
    dataRDD.foreach(println(_))
  }

  def countByKeyOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(("A", 1001), ("B", 1002), ("A", 1003), ("C", 1004)))
    //返回的是一个map类型的数据
    val res = dataRDD.countByKey()
    for ((k, v) <- res) {
      println(k + "," + v)
    }
  }


  def saveAsTextFileOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(100, 200, 300, 400, 500))
    //指定HDFS的路径信息即可,需要指定一个不存在的目录
    dataRDD.saveAsTextFile("hdfs://master:9000/scalademo/out")
  }

  def countOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    val res = dataRDD.count()
    println(res)
  }

  def takeOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    //从RDD中获取前2个元素
    val res = dataRDD.take(2)
    for (item <- res) {
      println(item)
    }
  }

  def collectOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    //collect返回的是一个Array数组
    //注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
    //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
    val res = dataRDD.collect()
    for (item <- res) {
      println(item)
    }
  }

  def reduceOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
    val num = dataRDD.reduce(_ + _)
    println("sum=" + num)
  }

  /**
   * 获取SparkContext
   *
   * @return
   */
  private def getSparkContext = {
    val conf = new SparkConf()
    conf.setAppName("ActionOpScala")
      .setMaster("local")
    new SparkContext(conf)
  }
}

3.Java实现

package com.simoniu.sparkdemo.javademo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * 需求:Action实战
 * reduce:聚合计算
 * collect:获取元素集合
 * take(n):获取前n个元素
 * count:获取元素总数
 * saveAsTextFile:保存文件
 * countByKey:统计相同的key出现多少次
 * foreach:迭代遍历元素
 *
 * Created by simoniu
 */
public class ActionJavaDemo {

    public static void main(String[] args) {
        JavaSparkContext sc = getSparkContext();
        //reduce:聚合计算
        //reduceOp(sc);
        //collect:获取元素集合
        //collectOp(sc);
        //take(n):获取前n个元素
        //takeOp(sc);
        //count:获取元素总数
        //countOp(sc);
        //saveAsTextFile:保存文件
        saveAsTextFileOp(sc);
        //countByKey:统计相同的key出现多少次
        //countByKeyOp(sc);
        //foreach:迭代遍历元素
        //foreachOp(sc);

        sc.stop();
    }


    private static void foreachOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        dataRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer i) throws Exception {
                System.out.println(i);
            }
        });
    }

    private static void countByKeyOp(JavaSparkContext sc) {
        Tuple2<String, Integer> t1 = new Tuple2<>("A", 1001);
        Tuple2<String, Integer> t2 = new Tuple2<>("B", 1002);
        Tuple2<String, Integer> t3 = new Tuple2<>("A", 1003);
        Tuple2<String, Integer> t4 = new Tuple2<>("C", 1004);
        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
        //想要使用countByKey,需要先使用mapToPair对RDD进行转换
        Map<String, Long> res = dataRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, Integer> tup)
                    throws Exception {
                return new Tuple2<String, Integer>(tup._1, tup._2);
            }
        }).countByKey();
        for(Map.Entry<String,Long> entry: res.entrySet()){
            System.out.println(entry.getKey()+","+entry.getValue());
        }
    }

    private static void saveAsTextFileOp(JavaSparkContext sc) {
        JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("java", "c", "c++", "html", "python"));
        dataRDD.saveAsTextFile("hdfs://master:9000/scalademo/out");
    }

    private static void countOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        long res = dataRDD.count();
        System.out.println(res);
    }

    private static void takeOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        List<Integer> res = dataRDD.take(2);
        for(Integer item : res){
            System.out.println(item);
        }
    }

    private static void collectOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        List<Integer> res = dataRDD.collect();
        for(Integer item : res){
            System.out.println(item);
        }
    }

    private static void reduceOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        Integer num = dataRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });
        System.out.println(num);
    }

    private static JavaSparkContext getSparkContext() {
        SparkConf conf = new SparkConf();
        conf.setAppName("ActionOpJava")
                .setMaster("local");
        return new JavaSparkContext(conf);
    }
}