TableEnvironment对象。
使用TableAPI和SQL,首先要创建一个TableEnvironment对象。下来我们分别使用scala和java实现TableEnvironment对象的创建。
1).scala实现
package com.simoniu.flink.scala.tablesql
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
/**
* 创建TableEnvironment对象
* Created by simoniu
*/
object CreateTableEnvironmentScalaDemo {
def main(args: Array[String]): Unit = {
/**
* 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换
* 则针对stream和batch都可以使用TableEnvironment
*/
//指定底层使用Blink引擎,以及数据处理模式-stream
//从1.11版本开始,Blink引擎成为Table API和SQL的默认执行引擎,在生产环境下面,推荐使用Blink引擎
val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
//创建TableEnvironment对象
val sTableEnv = TableEnvironment.create(sSettings)
//指定底层使用Blink引擎,以及数据处理模式-batch
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
//创建TableEnvironment对象
val bTableEnv = TableEnvironment.create(bSettings)
/**
* 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换
* 针对stream需要使用StreamTableEnvironment
* 针对batch需要使用BatchTableEnvironment
*/
//创建StreamTableEnvironment
val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
//创建BatchTableEnvironment
//注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换
val bbEnv = ExecutionEnvironment.getExecutionEnvironment
val bbTableEnv = BatchTableEnvironment.create(bbEnv)
}
}
2).java实现
package com.simoniu.flink.java.tablesql;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 创建TableEnvironment对象
* Created by simoniu
*/
public class CreateTableEnvironmentJavaDemo {
public static void main(String[] args) {
/**
* 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换
* 则针对stream和batch都可以使用TableEnvironment
*/
//创建TableEnvironment对象-stream
EnvironmentSettings sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment sTableEnv = TableEnvironment.create(sSettings);
//创建TableEnvironment对象-batch
EnvironmentSettings bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bTableEnv = TableEnvironment.create(bSettings);
/**
* 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换
* 针对stream需要使用StreamTableEnvironment
* 针对batch需要使用BatchTableEnvironment
*/
//创建StreamTableEnvironment
StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);
//创建BatchTableEnvironment
//注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换
ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);
}
}