← 返回首页
RDD转换为DataFrame
发表时间:2023-10-25 01:17:39
RDD转换为DataFrame

RDD转换为DataFrame。

1.RDD转换为DataFrame

为什么要将RDD转换为DataFrame? 在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。 所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。

Spark SQL支持这两种方式将RDD转换为DataFrame。

2.反射方式

使用反射来推断RDD中的元数据。基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的话使用反射这种方式是一种非常不错的选择。Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame。

1).scala代码实现

package com.simoniu.scalademo.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 需求:使用反射方式实现RDD转换为DataFrame
 * Created by simoniu
 */
object RddToDataFrameByReflectScalaDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("RddToDataFrameByReflectScala")
      .config(conf)
      .getOrCreate()

    //获取SparkContext
    val sc = sparkSession.sparkContext

    val dataRDD = sc.parallelize(Array(("jack",28),("tom",20),("jessic",30),("david",18)))

    //基于反射直接将包含Student对象的dataRDD转换为DataFrame
    //需要导入隐式转换
    import sparkSession.implicits._
    val stuDf = dataRDD.map(tup=>Student(tup._1,tup._2)).toDF()

    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
    stuDf.createOrReplaceTempView("student")

    //执行sql查询
    val resDf = sparkSession.sql("select name,age from student where age > 20")

    //将DataFrame转化为RDD
    val resRDD = resDf.rdd
    //从row中取数据,封装成student,打印到控制台
    resRDD.map(row=>Student(row(0).toString,row(1).toString.toInt))
      .collect()
      .foreach(println(_))

    //使用row的getAs()方法,获取指定列名的值
    resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs[Int]("age")))
      .collect()
      .foreach(println(_))

    sparkSession.stop()
  }
}

//定义一个Student
case class Student(name: String,age: Int)

运行结果:

Student(jack,28)
Student(jessic,30)

2).java代码实现

Student.java

package com.simoniu.sparkdemo.javademo.sql;

import java.io.Serializable;

public class Student implements Serializable {

    private String name;
    private Integer age;

    public Student() {

    }

    public Student(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

RddToDataFrameByReflectJavaDemo.java

package com.simoniu.sparkdemo.javademo.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;


/**
 * 需求:使用反射方式实现RDD转换为DataFrame
 * Created by simoniu
 */
public class RddToDataFrameByReflectJavaDemo {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("RddToDataFrameByReflectJava")
                .config(conf)
                .getOrCreate();

        //获取SparkContext
        //从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContext
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 28);
        Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);
        Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic", 30);
        Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("david", 18);

        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));


        JavaRDD<Student> stuRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Student>() {
            @Override
            public Student call(Tuple2<String, Integer> tup) throws Exception {
                return new Student(tup._1, tup._2);
            }
        });

        //注意:Student这个类必须声明为public,并且必须实现序列化
        Dataset<Row> stuDf = sparkSession.createDataFrame(stuRDD, Student.class);

        stuDf.createOrReplaceTempView("student");
        //执行sql查询
        Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 20");
        //将DataFrame转化为RDD,注意:这里需要转为JavaRDD
        JavaRDD<Row> resRDD = resDf.javaRDD();
        //从row中取数据,封装成student,打印到控制台
        List<Student> resList = resRDD.map(new Function<Row, Student>() {
            @Override
            public Student call(Row row) throws Exception {
                //return new Student(row.getString(0), row.getInt(1));
                //通过getAs获取数据
                return new Student(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString()));
            }
        }).collect();
        for (Student stu : resList) {
            System.out.println(stu);
        }

        sparkSession.stop();

    }}

3.编程方式

编程的方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了。

1).scala实现

package com.simoniu.scalademo.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * 需求:使用编程方式实现RDD转换为DataFrame
 * Created by simoniu
 */
object RddToDataFrameByProgramScalaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("RddToDataFrameByProgramScala")
      .config(conf)
      .getOrCreate()

    //获取SparkContext
    val sc = sparkSession.sparkContext

    val dataRDD = sc.parallelize(Array(("jack",28),("tom",20),("jessic",30),("david",18)))
    //组装rowRDD
    val rowRDD = dataRDD.map(tup=>Row(tup._1,tup._2))
    //指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
    val schema = StructType(Array(
      StructField("name",StringType,true),
      StructField("age",IntegerType,true)
    ))
    //组装DataFrame
    val stuDf = sparkSession.createDataFrame(rowRDD,schema)

    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
    stuDf.createOrReplaceTempView("student")

    //执行sql查询
    val resDf = sparkSession.sql("select name,age from student where age > 20")

    //将DataFrame转化为RDD
    val resRDD = resDf.rdd

    resRDD.map(row=>(row(0).toString,row(1).toString.toInt))
      .collect()
      .foreach(println(_))

    sparkSession.stop()
  }
}

2).java实现

package com.simoniu.sparkdemo.javademo.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 需求:使用编程方式实现RDD转换为DataFrame
 * Created by simoniu
 */
public class RddToDataFrameByProgramJavaDemo {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("RddToDataFrameByProgramJava")
                .config(conf)
                .getOrCreate();

        //获取SparkContext
        //从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContext
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 28);
        Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);
        Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic", 30);
        Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("david", 18);

        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3,t4));

        //组装rowRDD
        JavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Row>() {
            @Override
            public Row call(Tuple2<String, Integer> tup) throws Exception {
                return RowFactory.create(tup._1, tup._2);
            }
        });
        //指定元数据信息
        ArrayList<StructField> structFieldList = new ArrayList<StructField>();
        structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType schema = DataTypes.createStructType(structFieldList);
        //构建DataFrame
        Dataset<Row> stuDf = sparkSession.createDataFrame(rowRDD, schema);

        stuDf.createOrReplaceTempView("student");
        //执行sql查询
        Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 20");

        //将DataFrame转化为RDD,注意:这里需要转为JavaRDD
        JavaRDD<Row> resRDD = resDf.javaRDD();

        List<Tuple2<String, Integer>> resList = resRDD.map(new Function<Row, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(row.getString(0), row.getInt(1));
            }
        }).collect();

        for(Tuple2<String,Integer> tup : resList){
            System.out.println(tup);
        }
        sparkSession.stop();
    }
}