TopN主播统计案例。
前面的Spark的RDD持久化章节中,我们已经实现了TopN主播统计案例。计算每个大区当天金币收入TopN的主播,之前我们使用spark中的transformation算子去计算,实现起来还是比较麻烦的,代码量相对来说比较多,下面我们就使用咱们刚学习的Spark sql去实现一下,你会发现,使用sql之后确实简单多了。
回顾以下我们的两份原始数据,数据都是json格式的:
使用Spark sql具体实现步骤如下:
SQL如下:
select
t4.area,
concat_ws(',',collect_list(t4.topn)) as topn_list
from(
select
t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn
from(
select
t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num
from(
select
t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all
from(
select
vi.uid,vi.vid,vi.area,gr.gold_sum
from
video_info as vi
join
(select
vid,sum(gold) as gold_sum
from
gift_record
group by vid
)as gr
on vi.vid = gr.vid
) as t1
group by t1.uid
) as t2
)as t3
where t3.num <=3
) as t4
group by t4.area
1).scala代码实现如下:
package com.simoniu.scalademo.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:计算TopN主播
* 1:直接使用sparkSession中的load方式加载json数据
* 2:对这两份数据注册临时表
* 3:执行sql计算TopN主播
* 4:使用foreach将结果打印到控制台
* Created by simoniu
*/
object TopNAnchorScalaDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpScala")
.config(conf)
.getOrCreate()
//1:直接使用sparkSession中的load方式加载json数据
val videoInfoDf = sparkSession.read.json("D:\\uploadFiles\\video_info.log")
val giftRecordDf = sparkSession.read.json("D:\\uploadFiles\\gift_record.log")
//2:对这两份数据注册临时表
videoInfoDf.createOrReplaceTempView("video_info")
giftRecordDf.createOrReplaceTempView("gift_record")
//3:执行sql计算TopN主播
val sql = "select " +
"t4.area, " +
"concat_ws(',',collect_list(t4.topn)) as topn_list " +
"from( " +
"select " +
"t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn " +
"from( " +
"select " +
"t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num " +
"from( " +
"select " +
"t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all " +
"from( " +
"select " +
"vi.uid,vi.vid,vi.area,gr.gold_sum " +
"from " +
"video_info as vi " +
"join " +
"(select " +
"vid,sum(gold) as gold_sum " +
"from " +
"gift_record " +
"group by vid " +
")as gr " +
"on vi.vid = gr.vid " +
") as t1 " +
"group by t1.uid " +
") as t2 " +
")as t3 " +
"where t3.num <=3 " +
") as t4 " +
"group by t4.area "
val resDf = sparkSession.sql(sql)
//4:使用foreach将结果打印到控制台
resDf.rdd.foreach(row => println(row.getAs[String]("area") + "\t" + row.getAs[String]("topn_list")))
sparkSession.stop()
}
}
2).java代码实现如下:
package com.simoniu.sparkdemo.javademo.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 需求:计算TopN主播
* 1:直接使用sparkSession中的load方式加载json数据
* 2:对这两份数据注册临时表
* 3:执行sql计算TopN主播
* 4:使用foreach将结果打印到控制台
* Created by simoniu
*/
public class TopNAnchorJavaDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("TopNAnchorJava")
.config(conf)
.getOrCreate();
//1:直接使用sparkSession中的load方式加载json数据
Dataset<Row> videoInfoDf = sparkSession.read().json("D:\\uploadFiles\\video_info.log");
Dataset<Row> giftRecordDf = sparkSession.read().json("D:\\uploadFiles\\gift_record.log");
//2:对这两份数据注册临时表
videoInfoDf.createOrReplaceTempView("video_info");
giftRecordDf.createOrReplaceTempView("gift_record");
//3:执行sql计算TopN主播
String sql = "select " +
"t4.area, " +
"concat_ws(',',collect_list(t4.topn)) as topn_list " +
"from( " +
"select " +
"t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn " +
"from( " +
"select " +
"t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num " +
"from( " +
"select " +
"t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all " +
"from( " +
"select " +
"vi.uid,vi.vid,vi.area,gr.gold_sum " +
"from " +
"video_info as vi " +
"join " +
"(select " +
"vid,sum(gold) as gold_sum " +
"from " +
"gift_record " +
"group by vid " +
")as gr " +
"on vi.vid = gr.vid " +
") as t1 " +
"group by t1.uid " +
") as t2 " +
")as t3 " +
"where t3.num <=3 " +
") as t4 " +
"group by t4.area ";
Dataset<Row> resDf = sparkSession.sql(sql);
//4:使用foreach将结果打印到控制台
resDf.javaRDD().foreach(new VoidFunction<Row>() {
@Override
public void call(Row row) throws Exception {
System.out.println(row.getString(0) + "\t" + row.getString(1));
}
});
sparkSession.stop();
}
}
新建一个object:TopNAnchorClusterScalaDemo ,修改代码,将任务的输出数据保存到hdfs上面。
package com.simoniu.scalademo.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:计算TopN主播
* 1:直接使用sparkSession中的load方式加载json数据
* 2:对这两份数据注册临时表
* 3:执行sql计算TopN主播
* 4:使用foreach将结果打印到控制台
* Created by simoniu
*/
object TopNAnchorClusterScalaDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpScala")
.config(conf)
.getOrCreate()
//1:直接使用sparkSession中的load方式加载json数据
val videoInfoDf = sparkSession.read.json("hdfs://master:9000/topn/video_info.log")
val giftRecordDf = sparkSession.read.json("hdfs://master:9000/topn/gift_record.log")
//2:对这两份数据注册临时表
videoInfoDf.createOrReplaceTempView("video_info")
giftRecordDf.createOrReplaceTempView("gift_record")
//3:执行sql计算TopN主播
val sql ="select "+
"t4.area, "+
"concat_ws(',',collect_list(t4.topn)) as topn_list "+
"from( "+
"select "+
"t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn "+
"from( "+
"select "+
"t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num "+
"from( "+
"select "+
"t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all "+
"from( "+
"select "+
"vi.uid,vi.vid,vi.area,gr.gold_sum "+
"from "+
"video_info as vi "+
"join "+
"(select "+
"vid,sum(gold) as gold_sum "+
"from "+
"gift_record "+
"group by vid "+
")as gr "+
"on vi.vid = gr.vid "+
") as t1 "+
"group by t1.uid "+
") as t2 "+
")as t3 "+
"where t3.num <=3 "+
") as t4 "+
"group by t4.area "
val resDf = sparkSession.sql(sql)
//4:使用foreach将结果打印到控制台
resDf.rdd
.map(row=>row.getAs[String]("area")+"\t"+row.getAs[String]("topn_list"))
.saveAsTextFile("hdfs://master:9000/out-topn")
sparkSession.stop()
}
}
创建spark-submit脚本topNJob.sh:
spark-submit \
--class com.simoniu.scalademo.sql.TopNAnchorClusterScalaDemo \
--master yarn \
--deploy-mode cluster \
--executor-memory 1g \
--num-executors 5 \
--executor-cores 2 \
--conf "spark.default.parallelism=10" \
sparksubmitdemo.jar
提交任务:
[root@master examples]# sh -x topNJob.sh
查看结果:
[root@master examples]# hdfs dfs -cat /out-topn/*
2023-10-25 21:46:03,957 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
CN 8407173251008:120,8407173251003:60,8407173251014:50
2023-10-25 21:46:04,117 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
ID 8407173251005:160,8407173251010:140,8407173251002:70
US 8407173251015:180,8407173251012:70,8407173251001:60