RDD转换为DataFrame。
为什么要将RDD转换为DataFrame? 在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。 所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。
Spark SQL支持这两种方式将RDD转换为DataFrame。
使用反射来推断RDD中的元数据。基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的话使用反射这种方式是一种非常不错的选择。Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame。
1).scala代码实现
package com.simoniu.scalademo.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:使用反射方式实现RDD转换为DataFrame
* Created by simoniu
*/
object RddToDataFrameByReflectScalaDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("RddToDataFrameByReflectScala")
.config(conf)
.getOrCreate()
//获取SparkContext
val sc = sparkSession.sparkContext
val dataRDD = sc.parallelize(Array(("jack",28),("tom",20),("jessic",30),("david",18)))
//基于反射直接将包含Student对象的dataRDD转换为DataFrame
//需要导入隐式转换
import sparkSession.implicits._
val stuDf = dataRDD.map(tup=>Student(tup._1,tup._2)).toDF()
//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")
//执行sql查询
val resDf = sparkSession.sql("select name,age from student where age > 20")
//将DataFrame转化为RDD
val resRDD = resDf.rdd
//从row中取数据,封装成student,打印到控制台
resRDD.map(row=>Student(row(0).toString,row(1).toString.toInt))
.collect()
.foreach(println(_))
//使用row的getAs()方法,获取指定列名的值
resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs[Int]("age")))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}
//定义一个Student
case class Student(name: String,age: Int)
运行结果:
Student(jack,28)
Student(jessic,30)
2).java代码实现
Student.java
package com.simoniu.sparkdemo.javademo.sql;
import java.io.Serializable;
public class Student implements Serializable {
private String name;
private Integer age;
public Student() {
}
public Student(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
RddToDataFrameByReflectJavaDemo.java
package com.simoniu.sparkdemo.javademo.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
/**
* 需求:使用反射方式实现RDD转换为DataFrame
* Created by simoniu
*/
public class RddToDataFrameByReflectJavaDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("RddToDataFrameByReflectJava")
.config(conf)
.getOrCreate();
//获取SparkContext
//从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContext
JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 28);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic", 30);
Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("david", 18);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
JavaRDD<Student> stuRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Student>() {
@Override
public Student call(Tuple2<String, Integer> tup) throws Exception {
return new Student(tup._1, tup._2);
}
});
//注意:Student这个类必须声明为public,并且必须实现序列化
Dataset<Row> stuDf = sparkSession.createDataFrame(stuRDD, Student.class);
stuDf.createOrReplaceTempView("student");
//执行sql查询
Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 20");
//将DataFrame转化为RDD,注意:这里需要转为JavaRDD
JavaRDD<Row> resRDD = resDf.javaRDD();
//从row中取数据,封装成student,打印到控制台
List<Student> resList = resRDD.map(new Function<Row, Student>() {
@Override
public Student call(Row row) throws Exception {
//return new Student(row.getString(0), row.getInt(1));
//通过getAs获取数据
return new Student(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString()));
}
}).collect();
for (Student stu : resList) {
System.out.println(stu);
}
sparkSession.stop();
}}
编程的方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了。
1).scala实现
package com.simoniu.scalademo.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* 需求:使用编程方式实现RDD转换为DataFrame
* Created by simoniu
*/
object RddToDataFrameByProgramScalaDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("RddToDataFrameByProgramScala")
.config(conf)
.getOrCreate()
//获取SparkContext
val sc = sparkSession.sparkContext
val dataRDD = sc.parallelize(Array(("jack",28),("tom",20),("jessic",30),("david",18)))
//组装rowRDD
val rowRDD = dataRDD.map(tup=>Row(tup._1,tup._2))
//指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
val schema = StructType(Array(
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))
//组装DataFrame
val stuDf = sparkSession.createDataFrame(rowRDD,schema)
//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")
//执行sql查询
val resDf = sparkSession.sql("select name,age from student where age > 20")
//将DataFrame转化为RDD
val resRDD = resDf.rdd
resRDD.map(row=>(row(0).toString,row(1).toString.toInt))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}
2).java实现
package com.simoniu.sparkdemo.javademo.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 需求:使用编程方式实现RDD转换为DataFrame
* Created by simoniu
*/
public class RddToDataFrameByProgramJavaDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("RddToDataFrameByProgramJava")
.config(conf)
.getOrCreate();
//获取SparkContext
//从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContext
JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 28);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic", 30);
Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("david", 18);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3,t4));
//组装rowRDD
JavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Row>() {
@Override
public Row call(Tuple2<String, Integer> tup) throws Exception {
return RowFactory.create(tup._1, tup._2);
}
});
//指定元数据信息
ArrayList<StructField> structFieldList = new ArrayList<StructField>();
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(structFieldList);
//构建DataFrame
Dataset<Row> stuDf = sparkSession.createDataFrame(rowRDD, schema);
stuDf.createOrReplaceTempView("student");
//执行sql查询
Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 20");
//将DataFrame转化为RDD,注意:这里需要转为JavaRDD
JavaRDD<Row> resRDD = resDf.javaRDD();
List<Tuple2<String, Integer>> resList = resRDD.map(new Function<Row, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0), row.getInt(1));
}
}).collect();
for(Tuple2<String,Integer> tup : resList){
System.out.println(tup);
}
sparkSession.stop();
}
}