← 返回首页
使用DataStream创建表
发表时间:2023-11-27 16:03:26
使用DataStream创建表

使用DataStream创建表。

1.DataStream、DataSet和Table之间的互相转换

Table API和SQL可以很容易的和DataStream和DataSet程序集成到一块。通过TableEnvironment ,可以把DataStream或者DataSet注册为Table,这样就可以使用Table API和SQL查询了。通过TableEnvironment 也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream或者DataSet中的相关API了。

2.使用DataStream创建表

使用DataStream创建表,主要包含下面这两种情况:

1).scala实现

package com.simoniu.flink.scala.tablesql

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
 * 将DataStream转换成表
 * Created by simoniu
 */
object DataStreamToTableScalaDemo {
  def main(args: Array[String]): Unit = {
    //获取StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

    //获取DataStream
    import org.apache.flink.api.scala._
    val stream = ssEnv.fromCollection(Array((1, "jack","male"), (2, "tom","female"), (3, "mike","male")))

    //第一种:将DataStream转换为view视图
    import org.apache.flink.table.api._
    ssTableEnv.createTemporaryView("myTable",stream,'id,'name,'gender)
    ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()

    //第二种:将DataStream转换为table对象
    val table = ssTableEnv.fromDataStream(stream, $"id", $"name",$"gender")
    table.select($"id",$"name",$"gender")
      .filter($"id" > 1)
      .execute()
      .print()

    //注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果
  }
}

运行结果:

+-------------+--------------------------------+--------------------------------+
|          id |                           name |                         gender |
+-------------+--------------------------------+--------------------------------+
|           2 |                            tom |                         female |
|           3 |                           mike |                           male |
+-------------+--------------------------------+--------------------------------+
2 rows in set
+-------------+--------------------------------+--------------------------------+
|          id |                           name |                         gender |
+-------------+--------------------------------+--------------------------------+
|           2 |                            tom |                         female |
|           3 |                           mike |                           male |
+-------------+--------------------------------+--------------------------------+

2).java实现

package com.simoniu.flink.java.tablesql;

import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.ArrayList;
import static org.apache.flink.table.api.Expressions.$;

/**
 * 将DataStream转换成表
 * Created by simoniu
 */
public class DataStreamToTableJavaDemo {
    public static void main(String[] args) {
        //获取StreamTableEnvironment
        StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);

        //获取DataStream
        ArrayList<Tuple3<Integer, String, String>> data = new ArrayList<>();
        data.add(new Tuple3<Integer, String, String>(1, "jack", "male"));
        data.add(new Tuple3<Integer, String, String>(2, "tom", "female"));
        data.add(new Tuple3<Integer, String, String>(3, "mike", "male"));
        DataStreamSource<Tuple3<Integer, String, String>> stream = ssEnv.fromCollection(data);

        //第一种:将DataStream转换为view视图
        ssTableEnv.createTemporaryView("myTable", stream, $("id"), $("name"), $("gender"));
        ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print();

        //第二种:将DataStream转换为table对象
        Table table = ssTableEnv.fromDataStream(stream, $("id"), $("name"), $("gender"));
        table.select($("id"), $("name"), $("gender"))
                .filter($("id").isGreater(1))
                .execute()
                .print();
    }
}