使用DataSet创建表。
注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换。
1).scala实现
package com.simoniu.flink.scala.tablesql
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}
/**
* 将DataSet转换成表
* Created by simoniu
*/
object DataSetToTableScalaDemo {
def main(args: Array[String]): Unit = {
//获取BatchTableEnvironment
val bbEnv = ExecutionEnvironment.getExecutionEnvironment
val bbTableEnv = BatchTableEnvironment.create(bbEnv)
//获取DataSet
import org.apache.flink.api.scala._
val set = bbEnv.fromCollection(Array((1, "jack", "female"), (2, "tom", "male"), (3, "mike", "male")))
//第一种:将DataSet转换为view视图
import org.apache.flink.table.api._
bbTableEnv.createTemporaryView("myTable", set, 'id, 'name, 'gender)
bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()
//第二种:将DataSet转换为table对象
val table = bbTableEnv.fromDataSet(set, $"id", $"name", $"gender")
table.select($"id", $"name", $"gender")
.filter($"id" > 1)
.execute()
.print()
//注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果
}
}
运行结果:
+-------------+--------------------------------+--------------------------------+
| id | name | gender |
+-------------+--------------------------------+--------------------------------+
| 2 | tom | male |
| 3 | mike | male |
+-------------+--------------------------------+--------------------------------+
2 rows in set
+-------------+--------------------------------+--------------------------------+
| id | name | gender |
+-------------+--------------------------------+--------------------------------+
| 2 | tom | male |
| 3 | mike | male |
+-------------+--------------------------------+--------------------------------+
2 rows in set
2).java实现
package com.simoniu.flink.java.tablesql;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.ArrayList;
import static org.apache.flink.table.api.Expressions.$;
/**
* 将DataSet转换成表
* Created by simoniu
*/
public class DataSetToTableJavaDemo {
public static void main(String[] args) {
//获取BatchTableEnvironment
ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);
//获取DataSet
ArrayList<Tuple3<Integer, String, String>> data = new ArrayList<>();
data.add(new Tuple3<Integer, String, String>(1, "jack", "male"));
data.add(new Tuple3<Integer, String, String>(2, "tom", "female"));
data.add(new Tuple3<Integer, String, String>(3, "mike", "male"));
DataSource<Tuple3<Integer, String, String>> set = bbEnv.fromCollection(data);
//第一种:将DataSet转换为view视图
bbTableEnv.createTemporaryView("myTable", set, $("id"), $("name"), $("gender"));
bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print();
//第二种:将DataSet转换为table对象
Table table = bbTableEnv.fromDataSet(set, $("id"), $("name"), $("gender"));
table.select($("id"), $("name"), $("gender"))
.filter($("id").isGreater(1))
.execute()
.print();
}
}