常用Transformation算子。
常见的transformation算子见下表。

package com.simoniu.scalademo
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:Transformation实战
* map:对集合中每个元素乘以2
* filter:过滤出集合中的偶数
* flatMap:将行拆分为单词
* groupByKey:对每个大区的主播进行分组
* reduceByKey:统计每个大区的主播数量
* sortByKey:对主播的音浪收入排序
* join:打印每个主播的大区信息和音浪收入
* distinct:统计当天开播的主播数量
*
* Created by simoniu
*/
object TransformationScalaDemo {
def main(args: Array[String]): Unit = {
val sc = getSparkContext
//map:对集合中每个元素乘以2
mapOp(sc)
//filter:过滤出集合中的偶数
filterOp(sc)
//flatMap:将行拆分为单词
flatMapOp(sc)
//groupByKey:对每个大区的主播进行分组
//groupByKeyOp(sc)
groupByKeyOp2(sc)
//reduceByKey:统计每个大区的主播数量
reduceByKeyOp(sc)
//sortByKey:对主播的音浪收入排序
sortByKeyOp(sc)
//join:打印每个主播的大区信息和音浪收入
joinOp(sc)
//distinct:统计当天开播的大区信息
distinctOp(sc)
sc.stop()
}
def distinctOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "IN"),(150005, "US")))
//由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
dataRDD.map(_._2).distinct().foreach(println(_))
}
def joinOp(sc: SparkContext): Unit = {
val dataRDD1 = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "IN")))
val dataRDD2 = sc.parallelize(Array((150001, 400), (150002, 200), (150003, 300), (150004, 100)))
val joinRDD = dataRDD1.join(dataRDD2)
//joinRDD.foreach(println(_))
joinRDD.foreach(tup => {
//用户id
val uid = tup._1
val area_gold = tup._2
//大区
val area = area_gold._1
//音浪收入
val gold = area_gold._2
println(uid + "\t" + area + "\t" + gold)
})
}
def sortByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001, 400), (150002, 200), (150003, 300), (150004, 100)))
//由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置互换
/*dataRDD.map(tup=>(tup._2,tup._1))
.sortByKey(false)//默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false
.foreach(println(_))*/
//sortBy的使用:可以动态指定排序的字段,比较灵活
dataRDD.sortBy(_._2, false).foreach(println(_))
}
def reduceByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "JP"), (150005, "JP")))
//由于这个需求只需要使用到大区信息,所以在map操作的时候只保留大区信息即可
//为了计算大区的数量,所以在大区后面拼上了1,组装成了tuple2这种形式,这样就可以使用reduceByKey了
dataRDD.map(tup => (tup._2, 1)).reduceByKey(_ + _).foreach(println(_))
}
def groupByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "JP"), (150005, "JP")))
//需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作,
//此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value
//注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2这种形式
//此时map算子之后生成的新的数据格式是这样的:("US",150001)
//如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2
dataRDD.map(tup => (tup._2, tup._1)).groupByKey().foreach(tup => {
//获取大区信息
val area = tup._1
print(area + ":")
//获取同一个大区对应的所有用户id
val it = tup._2
for (uid <- it) {
print(uid + " ")
}
println()
})
}
def groupByKeyOp2(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001, "US", "male"), (150002, "CN", "female"), (150003, "CN", "male"), (150004, "IN", "female")))
//如果tuple中的数据列数超过了2列怎么办?
//把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下
//此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male"))
//注意:如果你的数据结构比较复杂,你可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式
dataRDD.map(tup => (tup._2, (tup._1, tup._3))).groupByKey().foreach(tup => {
//获取大区信息
val area = tup._1
print(area + ":")
//获取同一个大区对应的所有用户id和性别信息
val it = tup._2
for ((uid, sex) <- it) {
print("<" + uid + "," + sex + "> ")
}
println()
})
}
def flatMapOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array("good good study", "day day up"))
dataRDD.flatMap(_.split(" ")).foreach(println(_))
}
def filterOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
dataRDD.filter(_ % 2 == 0).foreach(println(_))
}
def mapOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
dataRDD.map(_ * 2).foreach(println(_))
}
/**
* 获取SparkContext
*
* @return
*/
private def getSparkContext = {
val conf = new SparkConf()
conf.setAppName("TransformationOpScala")
.setMaster("local")
new SparkContext(conf)
}
}
package com.simoniu.sparkdemo.javademo;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Int;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:
* map:对集合中每个元素乘以2
* filter:过滤出集合中的偶数
* flatMap:将行拆分为单词
* groupByKey:对每个大区的主播进行分组
* reduceByKey:统计每个大区的主播数量
* sortByKey:对主播的音浪收入排序
* join:打印每个主播的大区信息和音浪收入
* distinct:统计当天开播的主播数量
* <p>
* Created by simoniu
*/
public class TransformationJavaDemo {
public static void main(String[] args) {
JavaSparkContext sc = getSparkContext();
//map:对集合中每个元素乘以2
mapOp(sc);
//filter:过滤出集合中的偶数
filterOp(sc);
//flatMap:将行拆分为单词
flatMapOp(sc);
//groupByKey:对每个大区的主播进行分组
groupByKeyOp(sc);
groupByKeyOp2(sc);
//reduceByKey:统计每个大区的主播数量
reduceByKeyOp(sc);
//sortByKey:对主播的音浪收入排序
sortByKeyOp(sc);
//join:打印每个主播的大区信息和音浪收入
joinOp(sc);
//distinct:统计当天开播的主播数量
distinctOp(sc);
sc.stop();
}
private static void distinctOp(JavaSparkContext sc) {
Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.map(new Function<Tuple2<Integer, String>, String>() {
@Override
public String call(Tuple2<Integer, String> tup) throws Exception {
return tup._2;
}
}).distinct().foreach(new VoidFunction<String>() {
@Override
public void call(String area) throws Exception {
System.out.println(area);
}
});
}
private static void joinOp(JavaSparkContext sc) {
Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
Tuple2<Integer, Integer> t5 = new Tuple2<Integer, Integer>(150001, 400);
Tuple2<Integer, Integer> t6 = new Tuple2<Integer, Integer>(150002, 200);
Tuple2<Integer, Integer> t7 = new Tuple2<Integer, Integer>(150003, 300);
Tuple2<Integer, Integer> t8 = new Tuple2<Integer, Integer>(150004, 100);
JavaRDD<Tuple2<Integer, String>> dataRDD1 = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
JavaRDD<Tuple2<Integer, Integer>> dataRDD2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
JavaPairRDD<Integer, String> dataRDD1Pair = dataRDD1.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, String> tup)
throws Exception {
return new Tuple2<Integer, String>(tup._1, tup._2);
}
});
JavaPairRDD<Integer, Integer> dataRDD2Pair = dataRDD2.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup)
throws Exception {
return new Tuple2<Integer, Integer>(tup._1, tup._2);
}
});
dataRDD1Pair.join(dataRDD2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> tup) throws Exception {
System.out.println(tup);
}
});
}
private static void sortByKeyOp(JavaSparkContext sc) {
Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150001, 400);
Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150002, 200);
Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150003, 300);
Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
/*dataRDD.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup)
throws Exception {
return new Tuple2<Integer, Integer>(tup._2,tup._1);
}
}).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tup) throws Exception {
System.out.println(tup);
}
});*/
//使用sortBy
dataRDD.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer call(Tuple2<Integer, Integer> tup) throws Exception {
return tup._2;
}
}, false, 1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tup) throws Exception {
System.out.println(tup);
}
});
}
private static void reduceByKeyOp(JavaSparkContext sc) {
Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)
throws Exception {
return new Tuple2<String, Integer>(tup._2, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tup) throws Exception {
System.out.println(tup);
}
});
}
private static void groupByKeyOp(JavaSparkContext sc) {
Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
//如果想要使用...ByKey之类的算子,需要先使用...ToPair算子
dataRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)
throws Exception {
return new Tuple2<String, Integer>(tup._2, tup._1);
}
}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {
//获取大区信息
String area = tup._1;
System.out.print(area + ":");
//获取同一个大区对应的所有用户id
Iterable<Integer> it = tup._2;
for (Integer uid : it) {
System.out.print(uid + " ");
}
System.out.println();
}
});
}
private static void groupByKeyOp2(JavaSparkContext sc) {
Tuple3<Integer, String, String> t1 = new Tuple3<Integer, String, String>(150001, "US", "male");
Tuple3<Integer, String, String> t2 = new Tuple3<Integer, String, String>(150002, "CN", "female");
Tuple3<Integer, String, String> t3 = new Tuple3<Integer, String, String>(150003, "CN", "male");
Tuple3<Integer, String, String> t4 = new Tuple3<Integer, String, String>(150004, "IN", "female");
JavaRDD<Tuple3<Integer, String, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.mapToPair(new PairFunction<Tuple3<Integer, String, String>, String, Tuple2<Integer, String>>() {
@Override
public Tuple2<String, Tuple2<Integer, String>> call(Tuple3<Integer, String, String> tup) throws Exception {
return new Tuple2<String, Tuple2<Integer, String>>(tup._2(), new Tuple2<Integer, String>(tup._1(), tup._3()));
}
}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<Integer, String>>>>() {
@Override
public void call(Tuple2<String, Iterable<Tuple2<Integer, String>>> tup)
throws Exception {
//获取大区信息
String area = tup._1;
System.out.print(area + ":");
//获取同一个大区对应的所有用户id和性别信息
Iterable<Tuple2<Integer, String>> it = tup._2;
for (Tuple2<Integer, String> tu : it) {
System.out.print("<" + tu._1 + "," + tu._2 + "> ");
}
System.out.println();
}
});
}
private static void flatMapOp(JavaSparkContext sc) {
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("good good study", "day day up"));
dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
String[] words = line.split(" ");
return Arrays.asList(words).iterator();
}
}).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
}
private static void filterOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
dataRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) throws Exception {
return integer % 2 == 0;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
private static void mapOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer i) throws Exception {
return i * 2;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
private static JavaSparkContext getSparkContext() {
SparkConf conf = new SparkConf();
conf.setAppName("TransformationJavaDemo")
.setMaster("local");
return new JavaSparkContext(conf);
}
}