Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。
Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。
Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。简单理解就是: DataFrame=RDD+Schema。
它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。
Spark1.3出现的DataFrame,Spark1.6出现了DataSet,在Spark2.0中两者统一,DataFrame等于DataSet[Row]
要使用Spark SQL,首先需要创建一个SpakSession对象,SparkSession中包含了SparkContext和SqlContext 所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext,这个SqlContext是使用sparkSQL操作hive的时候会用到的。
使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame,那下面我们来使用JSON文件来创建一个DataFrame。
案例:使用JSON文件来创建一个DataFrame
在项目中添加sql这个包名。

首先使用spark-sql需要先添加spark-sql和janino的依赖。
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<spark-version>2.4.5</spark-version>
<scala-version>2.11.12</scala-version>
<janino-version>3.0.8</janino-version>
<guava-version>14.0.1</guava-version>
<fastjson.versionn>1.2.68</fastjson.versionn>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark-version}</version>
<!--
<scope>provided</scope>
-->
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>${janino-version}</version>
</dependency>
<!---->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava-version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.versionn}</version>
<!--
<scope>provided</scope>
-->
</dependency>
</dependencies>
students.json测试数据。
{"name":"jackson","age":18,"gender":"male"}
{"name":"tom","age":19,"gender":"female"}
{"name":"jerry","age":17,"gender":"male"}
{"name":"bob","age":22,"gender":"female"}
{"name":"backham","age":21,"gender":"male"}
{"name":"richad","age":20,"gender":"male"}
1).scala实现
package com.simoniu.scalademo.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:使用json文件创建DataFrame
* Created by simoniu
*/
object SparkSqlScalaDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("SqlDemoScala")
.config(conf)
.getOrCreate()
//读取json文件,获取DataFrame
val stuDf = sparkSession.read.json("D:\\uploadFiles\\students.json")
//查看DataFrame中的数据
stuDf.show()
sparkSession.stop()
}
}
2).java实现
package com.simoniu.sparkdemo.javademo.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 需求:使用json文件创建DataFrame
* Created by simoniu
*/
public class SparkSqlJavaDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("SqlDemoJava")
.config(conf)
.getOrCreate();
//读取json文件,获取Dataset<Row>
Dataset<Row> stuDf = sparkSession.read().json("D:\\uploadFiles\\students.json");
stuDf.show();
sparkSession.stop();
}
}
运行结果:
+---+------+-------+
|age|gender| name|
+---+------+-------+
| 18| male|jackson|
| 19|female| tom|
| 17| male| jerry|
| 22|female| bob|
| 21| male|backham|
| 20| male| richad|
+---+------+-------+
由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的,前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet,尝试对他们进行转换。
在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响。
//将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("D:\\uploadFiles\\students.json").as("stu")
在Java代码中将DataSet[Row]转换为DataFrame。
//将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("D:\\uploadFiles\\students.json").toDF();