常用Action。
常见的Action算子如下:
算子 介绍
reduce 将RDD中的所有元素进行聚合操作
collect 将RDD中所有元素获取到本地客户端(Driver)
count 获取RDD中元素总数
take(n) 获取RDD中前n个元素
saveAsTextFile 将RDD中元素保存到文件中,对每个元素调用toString
countByKey 对每个key对应的值进行count计数
foreach 遍历RDD中的每个元素
package com.simoniu.scalademo
import org.apache.spark.{SparkConf, SparkContext}
object ActionScalaDemo {
/**
* 需求:Action实战
* reduce:聚合计算
* collect:获取元素集合
* take(n):获取前n个元素
* count:获取元素总数
* saveAsTextFile:保存文件
* countByKey:统计相同的key出现多少次
* foreach:迭代遍历元素
*
* Created by simoniu
*/
def main(args: Array[String]): Unit = {
val sc = getSparkContext
//reduce:聚合计算
reduceOp(sc)
//collect:获取元素集合
collectOp(sc)
//take(n):获取前n个元素
takeOp(sc)
//count:获取元素总数
countOp(sc)
//saveAsTextFile:保存文件
saveAsTextFileOp(sc)
//countByKey:统计相同的key出现多少次
countByKeyOp(sc)
//foreach:迭代遍历元素
foreachOp(sc)
sc.stop()
}
def foreachOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
//注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的
//实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach,
//在里面实现具体向外部输出数据的代码
dataRDD.foreach(println(_))
}
def countByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(("A", 1001), ("B", 1002), ("A", 1003), ("C", 1004)))
//返回的是一个map类型的数据
val res = dataRDD.countByKey()
for ((k, v) <- res) {
println(k + "," + v)
}
}
def saveAsTextFileOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(100, 200, 300, 400, 500))
//指定HDFS的路径信息即可,需要指定一个不存在的目录
dataRDD.saveAsTextFile("hdfs://master:9000/scalademo/out")
}
def countOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
val res = dataRDD.count()
println(res)
}
def takeOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
//从RDD中获取前2个元素
val res = dataRDD.take(2)
for (item <- res) {
println(item)
}
}
def collectOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
//collect返回的是一个Array数组
//注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
//如果想要获取几条数据,查看一下数据格式,可以使用take(n)
val res = dataRDD.collect()
for (item <- res) {
println(item)
}
}
def reduceOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
val num = dataRDD.reduce(_ + _)
println("sum=" + num)
}
/**
* 获取SparkContext
*
* @return
*/
private def getSparkContext = {
val conf = new SparkConf()
conf.setAppName("ActionOpScala")
.setMaster("local")
new SparkContext(conf)
}
}
package com.simoniu.sparkdemo.javademo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* 需求:Action实战
* reduce:聚合计算
* collect:获取元素集合
* take(n):获取前n个元素
* count:获取元素总数
* saveAsTextFile:保存文件
* countByKey:统计相同的key出现多少次
* foreach:迭代遍历元素
*
* Created by simoniu
*/
public class ActionJavaDemo {
public static void main(String[] args) {
JavaSparkContext sc = getSparkContext();
//reduce:聚合计算
//reduceOp(sc);
//collect:获取元素集合
//collectOp(sc);
//take(n):获取前n个元素
//takeOp(sc);
//count:获取元素总数
//countOp(sc);
//saveAsTextFile:保存文件
saveAsTextFileOp(sc);
//countByKey:统计相同的key出现多少次
//countByKeyOp(sc);
//foreach:迭代遍历元素
//foreachOp(sc);
sc.stop();
}
private static void foreachOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer i) throws Exception {
System.out.println(i);
}
});
}
private static void countByKeyOp(JavaSparkContext sc) {
Tuple2<String, Integer> t1 = new Tuple2<>("A", 1001);
Tuple2<String, Integer> t2 = new Tuple2<>("B", 1002);
Tuple2<String, Integer> t3 = new Tuple2<>("A", 1003);
Tuple2<String, Integer> t4 = new Tuple2<>("C", 1004);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
//想要使用countByKey,需要先使用mapToPair对RDD进行转换
Map<String, Long> res = dataRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> tup)
throws Exception {
return new Tuple2<String, Integer>(tup._1, tup._2);
}
}).countByKey();
for(Map.Entry<String,Long> entry: res.entrySet()){
System.out.println(entry.getKey()+","+entry.getValue());
}
}
private static void saveAsTextFileOp(JavaSparkContext sc) {
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("java", "c", "c++", "html", "python"));
dataRDD.saveAsTextFile("hdfs://master:9000/scalademo/out");
}
private static void countOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
long res = dataRDD.count();
System.out.println(res);
}
private static void takeOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
List<Integer> res = dataRDD.take(2);
for(Integer item : res){
System.out.println(item);
}
}
private static void collectOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
List<Integer> res = dataRDD.collect();
for(Integer item : res){
System.out.println(item);
}
}
private static void reduceOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
Integer num = dataRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
System.out.println(num);
}
private static JavaSparkContext getSparkContext() {
SparkConf conf = new SparkConf();
conf.setAppName("ActionOpJava")
.setMaster("local");
return new JavaSparkContext(conf);
}
}