← 返回首页
TableEnvironment对象
发表时间:2023-11-26 15:40:25
TableEnvironment对象

TableEnvironment对象。

1.TableEnvironment对象

使用TableAPI和SQL,首先要创建一个TableEnvironment对象。下来我们分别使用scala和java实现TableEnvironment对象的创建。

1).scala实现

package com.simoniu.flink.scala.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
 * 创建TableEnvironment对象
 * Created by simoniu
 */
object CreateTableEnvironmentScalaDemo {
  def main(args: Array[String]): Unit = {
    /**
     * 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换
     * 则针对stream和batch都可以使用TableEnvironment
     */

    //指定底层使用Blink引擎,以及数据处理模式-stream
    //从1.11版本开始,Blink引擎成为Table API和SQL的默认执行引擎,在生产环境下面,推荐使用Blink引擎
    val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    //创建TableEnvironment对象
    val sTableEnv = TableEnvironment.create(sSettings)


    //指定底层使用Blink引擎,以及数据处理模式-batch
    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
    //创建TableEnvironment对象
    val bTableEnv = TableEnvironment.create(bSettings)

    /**
     * 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换
     * 针对stream需要使用StreamTableEnvironment
     * 针对batch需要使用BatchTableEnvironment
     */
    //创建StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

    //创建BatchTableEnvironment
    //注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)

  }
}

2).java实现

package com.simoniu.flink.java.tablesql;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 创建TableEnvironment对象
 * Created by simoniu
 */
public class CreateTableEnvironmentJavaDemo {

    public static void main(String[] args) {
        /**
         * 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换
         * 则针对stream和batch都可以使用TableEnvironment
         */
        //创建TableEnvironment对象-stream
        EnvironmentSettings sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment sTableEnv = TableEnvironment.create(sSettings);

        //创建TableEnvironment对象-batch
        EnvironmentSettings bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
        TableEnvironment bTableEnv = TableEnvironment.create(bSettings);

        /**
         * 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换
         * 针对stream需要使用StreamTableEnvironment
         * 针对batch需要使用BatchTableEnvironment
         */
        //创建StreamTableEnvironment
        StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);
        //创建BatchTableEnvironment
        //注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换
        ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);
    }
}