← 返回首页
使用DataSet创建表
发表时间:2023-11-28 14:10:47
使用DataSet创建表

使用DataSet创建表。

1.使用DataSet创建表

注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换。

1).scala实现

package com.simoniu.flink.scala.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}

/**
 * 将DataSet转换成表
 * Created by simoniu
 */
object DataSetToTableScalaDemo {
  def main(args: Array[String]): Unit = {
    //获取BatchTableEnvironment
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)

    //获取DataSet
    import org.apache.flink.api.scala._
    val set = bbEnv.fromCollection(Array((1, "jack", "female"), (2, "tom", "male"), (3, "mike", "male")))

    //第一种:将DataSet转换为view视图
    import org.apache.flink.table.api._
    bbTableEnv.createTemporaryView("myTable", set, 'id, 'name, 'gender)
    bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()

    //第二种:将DataSet转换为table对象
    val table = bbTableEnv.fromDataSet(set, $"id", $"name", $"gender")
    table.select($"id", $"name", $"gender")
      .filter($"id" > 1)
      .execute()
      .print()

    //注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果
  }
}

运行结果:

+-------------+--------------------------------+--------------------------------+
|          id |                           name |                         gender |
+-------------+--------------------------------+--------------------------------+
|           2 |                            tom |                           male |
|           3 |                           mike |                           male |
+-------------+--------------------------------+--------------------------------+
2 rows in set
+-------------+--------------------------------+--------------------------------+
|          id |                           name |                         gender |
+-------------+--------------------------------+--------------------------------+
|           2 |                            tom |                           male |
|           3 |                           mike |                           male |
+-------------+--------------------------------+--------------------------------+
2 rows in set

2).java实现

package com.simoniu.flink.java.tablesql;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.ArrayList;
import static org.apache.flink.table.api.Expressions.$;

/**
 * 将DataSet转换成表
 * Created by simoniu
 */
public class DataSetToTableJavaDemo {
    public static void main(String[] args) {
        //获取BatchTableEnvironment
        ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);

        //获取DataSet
        ArrayList<Tuple3<Integer, String, String>> data = new ArrayList<>();
        data.add(new Tuple3<Integer, String, String>(1, "jack", "male"));
        data.add(new Tuple3<Integer, String, String>(2, "tom", "female"));
        data.add(new Tuple3<Integer, String, String>(3, "mike", "male"));
        DataSource<Tuple3<Integer, String, String>> set = bbEnv.fromCollection(data);

        //第一种:将DataSet转换为view视图
        bbTableEnv.createTemporaryView("myTable", set, $("id"), $("name"), $("gender"));
        bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print();

        //第二种:将DataSet转换为table对象
        Table table = bbTableEnv.fromDataSet(set, $("id"), $("name"), $("gender"));
        table.select($("id"), $("name"), $("gender"))
                .filter($("id").isGreater(1))
                .execute()
                .print();
    }
}