← 返回首页
TopN主播统计案例
发表时间:2023-10-25 13:27:17
TopN主播统计案例

TopN主播统计案例。

前面的Spark的RDD持久化章节中,我们已经实现了TopN主播统计案例。计算每个大区当天金币收入TopN的主播,之前我们使用spark中的transformation算子去计算,实现起来还是比较麻烦的,代码量相对来说比较多,下面我们就使用咱们刚学习的Spark sql去实现一下,你会发现,使用sql之后确实简单多了。

回顾以下我们的两份原始数据,数据都是json格式的:

1.TopN主播统计

使用Spark sql具体实现步骤如下:

  1. 直接使用SparkSession中的load方式加载json的数据
  2. 对这两份数据注册临时表
  3. 执行sql计算TopN主播
  4. 使用foreach将结果打印到控制台

SQL如下:

select
    t4.area,
    concat_ws(',',collect_list(t4.topn)) as topn_list
from(
    select 
        t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn
    from(
        select
            t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num
        from(
            select
                t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all
            from(
                select
                    vi.uid,vi.vid,vi.area,gr.gold_sum
                from
                    video_info as vi
                join
                (select
                    vid,sum(gold) as gold_sum
                from
                    gift_record
                group by vid
                )as gr
                on vi.vid = gr.vid
            ) as t1
            group by t1.uid
        ) as t2
    )as t3
    where t3.num <=3
) as t4
group by t4.area

1).scala代码实现如下:

package com.simoniu.scalademo.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * 需求:计算TopN主播
 * 1:直接使用sparkSession中的load方式加载json数据
 * 2:对这两份数据注册临时表
 * 3:执行sql计算TopN主播
 * 4:使用foreach将结果打印到控制台
 * Created by simoniu
 */
object TopNAnchorScalaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("LoadAndSaveOpScala")
      .config(conf)
      .getOrCreate()

    //1:直接使用sparkSession中的load方式加载json数据
    val videoInfoDf = sparkSession.read.json("D:\\uploadFiles\\video_info.log")
    val giftRecordDf = sparkSession.read.json("D:\\uploadFiles\\gift_record.log")

    //2:对这两份数据注册临时表
    videoInfoDf.createOrReplaceTempView("video_info")
    giftRecordDf.createOrReplaceTempView("gift_record")

    //3:执行sql计算TopN主播
    val sql = "select " +
      "t4.area, " +
      "concat_ws(',',collect_list(t4.topn)) as topn_list " +
      "from( " +
      "select " +
      "t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn " +
      "from( " +
      "select " +
      "t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num " +
      "from( " +
      "select " +
      "t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all " +
      "from( " +
      "select " +
      "vi.uid,vi.vid,vi.area,gr.gold_sum " +
      "from " +
      "video_info as vi " +
      "join " +
      "(select " +
      "vid,sum(gold) as gold_sum " +
      "from " +
      "gift_record " +
      "group by vid " +
      ")as gr " +
      "on vi.vid = gr.vid " +
      ") as t1 " +
      "group by t1.uid " +
      ") as t2 " +
      ")as t3 " +
      "where t3.num <=3 " +
      ") as t4 " +
      "group by t4.area "
    val resDf = sparkSession.sql(sql)

    //4:使用foreach将结果打印到控制台
    resDf.rdd.foreach(row => println(row.getAs[String]("area") + "\t" + row.getAs[String]("topn_list")))

    sparkSession.stop()
  }
}

2).java代码实现如下:

package com.simoniu.sparkdemo.javademo.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * 需求:计算TopN主播
 * 1:直接使用sparkSession中的load方式加载json数据
 * 2:对这两份数据注册临时表
 * 3:执行sql计算TopN主播
 * 4:使用foreach将结果打印到控制台
 * Created by simoniu
 */
public class TopNAnchorJavaDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("TopNAnchorJava")
                .config(conf)
                .getOrCreate();

        //1:直接使用sparkSession中的load方式加载json数据
        Dataset<Row> videoInfoDf = sparkSession.read().json("D:\\uploadFiles\\video_info.log");
        Dataset<Row> giftRecordDf = sparkSession.read().json("D:\\uploadFiles\\gift_record.log");

        //2:对这两份数据注册临时表
        videoInfoDf.createOrReplaceTempView("video_info");
        giftRecordDf.createOrReplaceTempView("gift_record");

        //3:执行sql计算TopN主播
        String sql = "select " +
                "t4.area, " +
                "concat_ws(',',collect_list(t4.topn)) as topn_list " +
                "from( " +
                "select " +
                "t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn " +
                "from( " +
                "select " +
                "t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num " +
                "from( " +
                "select " +
                "t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all " +
                "from( " +
                "select " +
                "vi.uid,vi.vid,vi.area,gr.gold_sum " +
                "from " +
                "video_info as vi " +
                "join " +
                "(select " +
                "vid,sum(gold) as gold_sum " +
                "from " +
                "gift_record " +
                "group by vid " +
                ")as gr " +
                "on vi.vid = gr.vid " +
                ") as t1 " +
                "group by t1.uid " +
                ") as t2 " +
                ")as t3 " +
                "where t3.num <=3 " +
                ") as t4 " +
                "group by t4.area ";
        Dataset<Row> resDf = sparkSession.sql(sql);

        //4:使用foreach将结果打印到控制台
        resDf.javaRDD().foreach(new VoidFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {
                System.out.println(row.getString(0) + "\t" + row.getString(1));
            }
        });

        sparkSession.stop();
    }

}

2.任务提交到集群

新建一个object:TopNAnchorClusterScalaDemo ,修改代码,将任务的输出数据保存到hdfs上面。

package com.simoniu.scalademo.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 需求:计算TopN主播
 * 1:直接使用sparkSession中的load方式加载json数据
 * 2:对这两份数据注册临时表
 * 3:执行sql计算TopN主播
 * 4:使用foreach将结果打印到控制台
 * Created by simoniu
 */

object TopNAnchorClusterScalaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("LoadAndSaveOpScala")
      .config(conf)
      .getOrCreate()

    //1:直接使用sparkSession中的load方式加载json数据
    val videoInfoDf = sparkSession.read.json("hdfs://master:9000/topn/video_info.log")
    val giftRecordDf = sparkSession.read.json("hdfs://master:9000/topn/gift_record.log")

    //2:对这两份数据注册临时表
    videoInfoDf.createOrReplaceTempView("video_info")
    giftRecordDf.createOrReplaceTempView("gift_record")

    //3:执行sql计算TopN主播
    val sql ="select "+
      "t4.area, "+
      "concat_ws(',',collect_list(t4.topn)) as topn_list "+
      "from( "+
      "select "+
      "t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn "+
      "from( "+
      "select "+
      "t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num "+
      "from( "+
      "select "+
      "t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all "+
      "from( "+
      "select "+
      "vi.uid,vi.vid,vi.area,gr.gold_sum "+
      "from "+
      "video_info as vi "+
      "join "+
      "(select "+
      "vid,sum(gold) as gold_sum "+
      "from "+
      "gift_record "+
      "group by vid "+
      ")as gr "+
      "on vi.vid = gr.vid "+
      ") as t1 "+
      "group by t1.uid "+
      ") as t2 "+
      ")as t3 "+
      "where t3.num <=3 "+
      ") as t4 "+
      "group by t4.area "
    val resDf = sparkSession.sql(sql)

    //4:使用foreach将结果打印到控制台
    resDf.rdd
      .map(row=>row.getAs[String]("area")+"\t"+row.getAs[String]("topn_list"))
      .saveAsTextFile("hdfs://master:9000/out-topn")

    sparkSession.stop()
  }

}

创建spark-submit脚本topNJob.sh:

spark-submit \
--class com.simoniu.scalademo.sql.TopNAnchorClusterScalaDemo \
--master yarn \
--deploy-mode cluster \
--executor-memory 1g \
--num-executors 5 \
--executor-cores 2 \
--conf "spark.default.parallelism=10" \
sparksubmitdemo.jar

提交任务:

[root@master examples]# sh -x topNJob.sh

查看结果:

[root@master examples]# hdfs dfs -cat /out-topn/*
2023-10-25 21:46:03,957 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
CN      8407173251008:120,8407173251003:60,8407173251014:50
2023-10-25 21:46:04,117 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
ID      8407173251005:160,8407173251010:140,8407173251002:70
US      8407173251015:180,8407173251012:70,8407173251001:60