使用DataStream创建表。
Table API和SQL可以很容易的和DataStream和DataSet程序集成到一块。通过TableEnvironment ,可以把DataStream或者DataSet注册为Table,这样就可以使用Table API和SQL查询了。通过TableEnvironment 也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream或者DataSet中的相关API了。
使用DataStream创建表,主要包含下面这两种情况:
1).scala实现
package com.simoniu.flink.scala.tablesql
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* 将DataStream转换成表
* Created by simoniu
*/
object DataStreamToTableScalaDemo {
def main(args: Array[String]): Unit = {
//获取StreamTableEnvironment
val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
//获取DataStream
import org.apache.flink.api.scala._
val stream = ssEnv.fromCollection(Array((1, "jack","male"), (2, "tom","female"), (3, "mike","male")))
//第一种:将DataStream转换为view视图
import org.apache.flink.table.api._
ssTableEnv.createTemporaryView("myTable",stream,'id,'name,'gender)
ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()
//第二种:将DataStream转换为table对象
val table = ssTableEnv.fromDataStream(stream, $"id", $"name",$"gender")
table.select($"id",$"name",$"gender")
.filter($"id" > 1)
.execute()
.print()
//注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果
}
}
运行结果:
+-------------+--------------------------------+--------------------------------+
| id | name | gender |
+-------------+--------------------------------+--------------------------------+
| 2 | tom | female |
| 3 | mike | male |
+-------------+--------------------------------+--------------------------------+
2 rows in set
+-------------+--------------------------------+--------------------------------+
| id | name | gender |
+-------------+--------------------------------+--------------------------------+
| 2 | tom | female |
| 3 | mike | male |
+-------------+--------------------------------+--------------------------------+
2).java实现
package com.simoniu.flink.java.tablesql;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.ArrayList;
import static org.apache.flink.table.api.Expressions.$;
/**
* 将DataStream转换成表
* Created by simoniu
*/
public class DataStreamToTableJavaDemo {
public static void main(String[] args) {
//获取StreamTableEnvironment
StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);
//获取DataStream
ArrayList<Tuple3<Integer, String, String>> data = new ArrayList<>();
data.add(new Tuple3<Integer, String, String>(1, "jack", "male"));
data.add(new Tuple3<Integer, String, String>(2, "tom", "female"));
data.add(new Tuple3<Integer, String, String>(3, "mike", "male"));
DataStreamSource<Tuple3<Integer, String, String>> stream = ssEnv.fromCollection(data);
//第一种:将DataStream转换为view视图
ssTableEnv.createTemporaryView("myTable", stream, $("id"), $("name"), $("gender"));
ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print();
//第二种:将DataStream转换为table对象
Table table = ssTableEnv.fromDataStream(stream, $("id"), $("name"), $("gender"));
table.select($("id"), $("name"), $("gender"))
.filter($("id").isGreater(1))
.execute()
.print();
}
}