← 返回首页
TopN主播统计案例
发表时间:2023-10-15 04:06:50
TopN主播统计案例

TopN主播统计案例

1.需求分析

背景是这样的,我们有一款直播APP,已经在很多国家上线并运营了一段时间,产品经理希望开发一个功能,topN主播排行榜,按天更新排名信息,统计的维度有多种,其中有一个维度是针对主播当天直播的金币收入进行排名。

在我们的直播平台中有大区这个概念,一个大区下面包含多个国家,不同大区的运营策略是不一样的,所以就把不同国家划分到不同大区里面,方便运营。那这个TopN主播排行榜在统计的时候就需要分大区统计了。针对主播每天的开播数据我们已经有了,以及直播间内用户的送礼记录也都是有的。那这样其实就可以统计主播当天的金币收入了。主播一天可能会开播多次,所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。

分析:我们有两份数据,数据都是json格式的。

基于以上两份数据,计算每个大区当天金币收入TopN的主播,其实就是按照当天主播所有开播的直播间内的收入汇总,按大区分组,统计每个大区内收入TopN的主播。

原始分析日志数据如下:

video_info.log

{"uid":"8407173251001","vid":"14943445328940001","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":101,"share_num":"21","type":"video_info"}
{"uid":"8407173251002","vid":"14943445328940002","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":201,"share_num":"331","type":"video_info"}
{"uid":"8407173251003","vid":"14943445328940003","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":221,"share_num":"321","type":"video_info"}
{"uid":"8407173251004","vid":"14943445328940004","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":401,"share_num":"311","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940005","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":31,"share_num":"131","type":"video_info"}
{"uid":"8407173251006","vid":"14943445328940006","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":22,"share_num":"3431","type":"video_info"}
{"uid":"8407173251007","vid":"14943445328940007","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":44,"share_num":"131","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940008","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":66,"share_num":"131","type":"video_info"}
{"uid":"8407173251009","vid":"14943445328940009","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":32,"share_num":"231","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940010","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":342,"share_num":"431","type":"video_info"}
{"uid":"8407173251011","vid":"14943445328940011","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":223,"share_num":"331","type":"video_info"}
{"uid":"8407173251012","vid":"14943445328940012","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":554,"share_num":"312","type":"video_info"}
{"uid":"8407173251013","vid":"14943445328940013","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":334,"share_num":"321","type":"video_info"}
{"uid":"8407173251014","vid":"14943445328940014","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":653,"share_num":"311","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940015","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251001","vid":"14943445328940016","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":432,"share_num":"531","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940017","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940018","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":564,"share_num":"131","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940019","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":324,"share_num":"231","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940020","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":532,"share_num":"331","type":"video_info"}

gift_record.log

{"uid":"7201232141001","vid":"14943445328940001","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141002","vid":"14943445328940001","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141003","vid":"14943445328940002","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141004","vid":"14943445328940002","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141005","vid":"14943445328940003","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141006","vid":"14943445328940003","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141007","vid":"14943445328940004","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141008","vid":"14943445328940004","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141009","vid":"14943445328940005","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141010","vid":"14943445328940005","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141011","vid":"14943445328940006","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141012","vid":"14943445328940006","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141013","vid":"14943445328940007","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141014","vid":"14943445328940007","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141015","vid":"14943445328940008","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141016","vid":"14943445328940008","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141017","vid":"14943445328940009","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141018","vid":"14943445328940009","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141019","vid":"14943445328940010","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141020","vid":"14943445328940010","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141021","vid":"14943445328940011","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141022","vid":"14943445328940011","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141023","vid":"14943445328940012","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141024","vid":"14943445328940012","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141025","vid":"14943445328940013","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141026","vid":"14943445328940013","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141027","vid":"14943445328940014","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141028","vid":"14943445328940014","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141029","vid":"14943445328940015","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141030","vid":"14943445328940015","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141031","vid":"14943445328940016","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141032","vid":"14943445328940016","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141033","vid":"14943445328940017","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141034","vid":"14943445328940017","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141035","vid":"14943445328940018","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141036","vid":"14943445328940018","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141037","vid":"14943445328940019","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141038","vid":"14943445328940019","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141039","vid":"14943445328940020","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141040","vid":"14943445328940020","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}

2.代码实现

具体实现步骤如下:

1:首先获取两份数据中的核心字段,使用fastjson包解析数据 主播开播记录:主播ID:uid,直播间ID:vid,大区:area (vid,(uid,area)) 用户送礼记录:直播间ID:vid,金币数量:gold (vid,gold)

这样的可以把这两份数据关联到一块就能获取到大区、主播、金币这些信息了,使用直播间vid进行关联。

2:对用户送礼记录数据进行聚合,对相同vid的数据求和。因为用户可能在一次直播中给主播送多次礼物 (vid,gold_sum)

3:把这两份数据join到一块,vid作为join的key (vid,((uid,area),gold_sum))

4:使用map迭代join之后的数据,最后获取到uid,area,gold_sum字段,由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换成这种格式。uid和area是一一对应的,一个人只能属于一个大区。

((uid,area),gold_sum)

5:使用reduceByKey算子对数据进行聚合 ((uid,area),gold_sum_all)

6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换,因为我们要分区统计TopN,所以要根据大区分组。

map:(area,(uid,gold_sum_all)) groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>

7:使用map迭代每个分组内的数据,按金币数量倒序排序,取前N个,最终输出area、topN 这个TopN其实就是把前几名主播的id还有金币数量拼接成一个字符串。 (area,topN)

8:使用foreach将结果打印到控制台,多个字段使用制表符分割。 area topN

pom.xml依赖如下:

...
<properties>
    <java.version>1.8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.6.13</spring-boot.version>
    <spark-version>2.4.5</spark-version>
    <scala-version>2.11.12</scala-version>
    <guava-version>14.0.1</guava-version>
    <fastjson.versionn>1.2.68</fastjson.versionn>
</properties>
...
<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
   </dependency>

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
   </dependency>

   <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark-version}</version>
      <!--
        <scope>provided</scope>
      -->
       <exclusions>
           <exclusion>
               <groupId>org.scala-lang</groupId>
               <artifactId>scala-library</artifactId>
           </exclusion>
       </exclusions>
   </dependency>
   <!---->
   <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>${guava-version}</version>
   </dependency>

   <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala-version}</version>
   </dependency>

   <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson.versionn}</version>
      <!--
        <scope>provided</scope>
      -->
   </dependency>
</dependencies>

scala代码实现如下:

package com.simoniu.scalademo

import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:TopN主播统计
 * 1:首先获取两份数据中的核心字段,使用fastjson包解析数据
 * 主播开播记录(video_info.log):主播ID:uid,直播间ID:vid,大区:area
 * (vid,(uid,area))
 * 用户送礼记录(gift_record.log):直播间ID:vid,金币数量:gold
 * (vid,gold)
 *
 * 这样的话可以把这两份数据关联到一块就能获取到大区、主播id、金币这些信息了,使用直播间vid进行关联
 *
 * 2:对用户送礼记录数据进行聚合,对相同vid的数据求和
 * 因为用户可能在一次直播中给主播送多次礼物
 * (vid,gold_sum)
 *
 * 3:把这两份数据join到一块,vid作为join的key
 * (vid,((uid,area),gold_sum))
 *
 * 4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
 * 由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换成这种格式
 *
 * uid和area是一一对应的,一个人只能属于大区
 * ((uid,area),gold_sum)
 *
 * 5:使用reduceByKey算子对数据进行聚合
 * ((uid,area),gold_sum_all)
 *
 * 6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
 * 因为我们要分区统计TopN,所以要根据大区分组
 * map:(area,(uid,gold_sum_all))
 * groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
 *
 * 7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
 * 这个topN其实就是把前几名主播的id还有金币数量拼接成一个字符串
 * (area,topN)
 *
 * 8:使用foreach将结果打印到控制台,多个字段使用制表符分割
 * area topN
 * Created by simoniu
 */
object TopNScalaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("TopNScala")
      .setMaster("local")
    val sc = new SparkContext(conf)
    //1:首先获取两份数据中的核心字段,使用fastjson包解析数据
    val videoInfoRDD = sc.textFile("D:\\uploadFiles\\video_info.log")
    val giftRecordRDD = sc.textFile("D:\\uploadFiles\\gift_record.log")

    //(vid,(uid,area))
    val videoInfoFieldRDD = videoInfoRDD.map(line => {
      val jsonObj = JSON.parseObject(line)
      val vid = jsonObj.getString("vid")
      val uid = jsonObj.getString("uid")
      val area = jsonObj.getString("area")
      (vid, (uid, area))
    })

    //(vid,gold)
    val giftRecordFieldRDD = giftRecordRDD.map(line => {
      val jsonObj = JSON.parseObject(line)
      val vid = jsonObj.getString("vid")
      val gold = Integer.parseInt(jsonObj.getString("gold"))
      (vid, gold)
    })
    //2:对用户送礼记录数据进行聚合,对相同vid的数据求和
    //(vid,gold_sum)
    val giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(_ + _)

    //3:把这两份数据join到一块,vid作为join的key
    //(vid,((uid,area),gold_sum))
    val joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD)

    //4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
    //((uid,area),gold_sum)
    val joinMapRDD = joinRDD.map(tup => {
      //joinRDD: (vid,((uid,area),gold_sum))
      //获取uid
      val uid = tup._2._1._1
      //获取area
      val area = tup._2._1._2
      //获取gold_sum
      val gold_sum = tup._2._2
      ((uid, area), gold_sum)
    })

    //5:使用reduceByKey算子对数据进行聚合
    //((uid,area),gold_sum_all)
    val reduceRDD = joinMapRDD.reduceByKey(_ + _)

    //6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
    //map:(area,(uid,gold_sum_all))
    //groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
    val groupRDD = reduceRDD.map(tup => (tup._1._2, (tup._1._1, tup._2))).groupByKey()

    //7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
    //(area,topN)
    val top3RDD = groupRDD.map(tup => {
      val area = tup._1
      //toList:把iterable转成list
      //sortBy:排序,默认是正序
      //reverse:反转,实现倒序效果
      //take(3):取前3个元素
      //mkString:使用指定字符把集合转成字符串
      //uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_all
      val top3 = tup._2.toList.sortBy(_._2).reverse.take(3).map(tup => tup._1 + ":" + tup._2).mkString(",")
      (area, top3)
    })

    //8:使用foreach将结果打印到控制台,多个字段使用制表符分割
    top3RDD.foreach(tup => println(tup._1 + "\t" + tup._2))

    sc.stop()
  }
}

运行结果:

CN  8407173251008:120,8407173251003:60,8407173251014:50
ID  8407173251005:160,8407173251010:140,8407173251002:70
US  8407173251015:180,8407173251012:70,8407173251001:60

Java代码实现如下:

package com.simoniu.sparkdemo.javademo;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

/**
 * 需求:TopN主播统计
 * Created by simoniu
 */
public class TopNJavaDemo {

    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("TopNJava")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //1:首先获取两份数据中的核心字段,使用fastjson包解析数据
        JavaRDD<String> videoInfoRDD = sc.textFile("D:\\uploadFiles\\video_info.log");
        JavaRDD<String> giftRecordRDD = sc.textFile("D:\\uploadFiles\\gift_record.log");
        //(vid,(uid,area))
        JavaPairRDD<String, Tuple2<String, String>> videoInfoFieldRDD = videoInfoRDD.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, Tuple2<String, String>> call(String line)
                    throws Exception {
                JSONObject jsonObj = JSON.parseObject(line);
                String vid = jsonObj.getString("vid");
                String uid = jsonObj.getString("uid");
                String area = jsonObj.getString("area");
                return new Tuple2<String, Tuple2<String, String>>(vid, new Tuple2<String, String>(uid, area));
            }
        });

        //(vid,gold)
        JavaPairRDD<String, Integer> giftRecordFieldRDD = giftRecordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String line) throws Exception {
                JSONObject jsonObj = JSON.parseObject(line);
                String vid = jsonObj.getString("vid");
                Integer gold = Integer.parseInt(jsonObj.getString("gold"));
                return new Tuple2<String, Integer>(vid, gold);
            }
        });

        //2:对用户送礼记录数据进行聚合,对相同vid的数据求和
        //(vid,gold_sum)
        JavaPairRDD<String, Integer> giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        //3:把这两份数据join到一块,vid作为join的key
        //(vid,((uid,area),gold_sum))
        //4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
        //((uid,area),gold_sum)
        JavaPairRDD<Tuple2<String, String>, Integer> joinMapRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, String>, Integer>>, Tuple2<String, String>, Integer>() {
            @Override
            public Tuple2<Tuple2<String, String>, Integer> call(Tuple2<String, Tuple2<Tuple2<String, String>, Integer>> tup)
                    throws Exception {
                //joinRDD:(vid,((uid,area),gold_sum))
                //获取uid
                String uid = tup._2._1._1;
                //获取area
                String area = tup._2._1._2;
                //获取gold_sum
                Integer gold_sum = tup._2._2;
                return new Tuple2<Tuple2<String, String>, Integer>(new Tuple2<String, String>(uid, area), gold_sum);
            }
        });
        //5:使用reduceByKey算子对数据进行聚合
        //((uid,area),gold_sum_all)
        JavaPairRDD<Tuple2<String, String>, Integer> reduceRDD = joinMapRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });
        //6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
        //map:(area,(uid,gold_sum_all))
        //groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
        JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupRDD = reduceRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> tup)
                    throws Exception {
                return new Tuple2<String, Tuple2<String, Integer>>(tup._1._2, new Tuple2<String, Integer>(tup._1._1, tup._2));
            }
        }).groupByKey();

        //7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
        //(area,topN)
        JavaRDD<Tuple2<String, String>> top3RDD = groupRDD.map(new Function<Tuple2<String, Iterable<Tuple2<String, Integer>>>, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> tup)
                    throws Exception {
                String area = tup._1;
                ArrayList<Tuple2<String, Integer>> tupleList = Lists.newArrayList(tup._2);
                //对集合中的元素排序
                Collections.sort(tupleList, new Comparator<Tuple2<String, Integer>>() {
                    @Override
                    public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
                        return t2._2 - t1._2;
                    }
                });
                StringBuffer sb = new StringBuffer();
                for (int i = 0; i < tupleList.size(); i++) {
                    if (i < 3) {//top 3
                        Tuple2<String, Integer> t = tupleList.get(i);
                        if (i != 0) {
                            sb.append(",");
                        }
                        sb.append(t._1 + ":" + t._2);
                    }
                }
                return new Tuple2<String, String>(area, sb.toString());
            }
        });

        //8:使用foreach将结果打印到控制台,多个字段使用制表符分割
        //area  topN
        top3RDD.foreach(new VoidFunction<Tuple2<String, String>>() {
            @Override
            public void call(Tuple2<String, String> tup) throws Exception {
                System.out.println(tup._1+"\t"+tup._2);
            }
        });
        sc.stop();
    }
}