← 返回首页
Spark SQL入门
发表时间:2023-10-24 08:47:53
Spark SQL入门

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。

1.Spark SQL

Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。简单理解就是: DataFrame=RDD+Schema。

它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

Spark1.3出现的DataFrame,Spark1.6出现了DataSet,在Spark2.0中两者统一,DataFrame等于DataSet[Row]

2.SparkSession

要使用Spark SQL,首先需要创建一个SpakSession对象,SparkSession中包含了SparkContext和SqlContext 所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext,这个SqlContext是使用sparkSQL操作hive的时候会用到的。

3.创建DataFrame

使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame,那下面我们来使用JSON文件来创建一个DataFrame。

案例:使用JSON文件来创建一个DataFrame

在项目中添加sql这个包名。

首先使用spark-sql需要先添加spark-sql和janino的依赖。

<properties>
    <java.version>1.8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.6.13</spring-boot.version>
    <spark-version>2.4.5</spark-version>
    <scala-version>2.11.12</scala-version>
    <janino-version>3.0.8</janino-version>
    <guava-version>14.0.1</guava-version>
    <fastjson.versionn>1.2.68</fastjson.versionn>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
   </dependency>

   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.11</artifactId>
       <version>${spark-version}</version>
       <!--
         <scope>provided</scope>
       -->
        <exclusions>
            <exclusion>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
            </exclusion>
        </exclusions>
   </dependency>

   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_2.11</artifactId>
       <version>${spark-version}</version>
   </dependency>

   <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
       <version>${janino-version}</version>
   </dependency>

   <!---->
   <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <version>${guava-version}</version>
   </dependency>

   <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala-version}</version>
   </dependency>

   <dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>fastjson</artifactId>
       <version>${fastjson.versionn}</version>
       <!--
         <scope>provided</scope>
       -->
    </dependency>
</dependencies>

students.json测试数据。

{"name":"jackson","age":18,"gender":"male"}
{"name":"tom","age":19,"gender":"female"}
{"name":"jerry","age":17,"gender":"male"}
{"name":"bob","age":22,"gender":"female"}
{"name":"backham","age":21,"gender":"male"}
{"name":"richad","age":20,"gender":"male"}

1).scala实现

package com.simoniu.scalademo.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 需求:使用json文件创建DataFrame
 * Created by simoniu
 */
object SparkSqlScalaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("SqlDemoScala")
      .config(conf)
      .getOrCreate()
    //读取json文件,获取DataFrame
    val stuDf = sparkSession.read.json("D:\\uploadFiles\\students.json")

    //查看DataFrame中的数据
    stuDf.show()

    sparkSession.stop()
  }
}

2).java实现

package com.simoniu.sparkdemo.javademo.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * 需求:使用json文件创建DataFrame
 * Created by simoniu
 */
public class SparkSqlJavaDemo {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("SqlDemoJava")
                .config(conf)
                .getOrCreate();

        //读取json文件,获取Dataset<Row>
        Dataset<Row> stuDf = sparkSession.read().json("D:\\uploadFiles\\students.json");

        stuDf.show();
        sparkSession.stop();
    }
}

运行结果:

+---+------+-------+
|age|gender|   name|
+---+------+-------+
| 18|  male|jackson|
| 19|female|    tom|
| 17|  male|  jerry|
| 22|female|    bob|
| 21|  male|backham|
| 20|  male| richad|
+---+------+-------+

由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的,前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet,尝试对他们进行转换。

在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响。

//将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("D:\\uploadFiles\\students.json").as("stu")

在Java代码中将DataSet[Row]转换为DataFrame。

//将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("D:\\uploadFiles\\students.json").toDF();