将表转换成DataStream。
流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。有以下几种模式可以将Table转换为DataStream。
1).scala实现
这里使用了《TableAPI和SQL的使用》小节的测试数据创建表。
package com.simoniu.flink.scala.tablesql
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
/**
* 将table转换成 DataStream
* Created by simoniu
*/
object TableToDataStreamScalaDemo {
def main(args: Array[String]): Unit = {
//获取StreamTableEnvironment
val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
//创建输入表
ssTableEnv.executeSql("" +
"create table source_users(\n" +
"id int,\n" +
"name string,\n" +
"password string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:\\uploadFiles\\source',\n" +
"'format.type' = 'csv'\n" +
")")
//获取table
val table = ssTableEnv.from("source_users")
//将table转换为DataStream
//如果只有新增(追加)操作,可以使用toAppendStream
import org.apache.flink.api.scala._
val appStream = ssTableEnv.toAppendStream[Row](table)
appStream.map(row => (row.getField(0).toString.toInt, row.getField(1).toString, row.getField(2).toString))
.print()
//如果有增加操作,还有删除操作,则使用toRetractStream
val retStream = ssTableEnv.toRetractStream[Row](table)
retStream.map(tup => {
val flag = tup._1
val row = tup._2
val id = row.getField(0).toString.toInt
val name = row.getField(1).toString
val password = row.getField(2).toString
(flag, id, name, password)
}).print()
//注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvironment中的execute方法了
ssEnv.execute("TableToDataStreamScala")
}
}
运行结果:
4> (205,郭靖,888888)
8> (209,欧阳锋,55555)
9> (210,黄药师,66666)
5> (206,黄蓉,666666)
12> (202,东方不败,654321)
4> (true,203,任我选,222222)
2> (204,风清扬,888888)
7> (208,王重阳,11111)
6> (207,令狐冲,999999)
11> (true,209,欧阳锋,55555)
7> (true,205,郭靖,888888)
12> (true,210,黄药师,66666)
11> (201,张三丰,123456)
9> (true,207,令狐冲,999999)
3> (true,202,东方不败,654321)
2> (true,201,张三丰,123456)
5> (true,204,风清扬,888888)
8> (true,206,黄蓉,666666)
1> (203,任我选,222222)
10> (true,208,王重阳,11111)
2).java实现
package com.simoniu.flink.java.tablesql;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* 将table转换成 DataStream
* Created by simoniu
*/
public class TableToDataStreamJavaDemo {
public static void main(String[] args) throws Exception {
//获取StreamTableEnvironment
StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);
//创建输入表
ssTableEnv.executeSql("" +
"create table source_users(\n" +
"id int,\n" +
"name string,\n" +
"password string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:\\uploadFiles\\source',\n" +
"'format.type' = 'csv'\n" +
")");
//获取table
Table table = ssTableEnv.from("source_users");
//将table转换为DataStream
//如果只有新增(追加)操作,可以使用toAppendStream
DataStream<Row> appStream = ssTableEnv.toAppendStream(table, Row.class);
appStream.map(new MapFunction<Row, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> map(Row row)
throws Exception {
int id = Integer.parseInt(row.getField(0).toString());
String name = row.getField(1).toString();
String password = row.getField(2).toString();
return new Tuple3<Integer, String, String>(id, name, password);
}
}).print();
//如果有增加操作,还有删除操作,则使用toRetractStream
DataStream<Tuple2<Boolean, Row>> retStream = ssTableEnv.toRetractStream(table, Row.class);
retStream.map(new MapFunction<Tuple2<Boolean, Row>, Tuple4<Boolean, Integer, String, String>>() {
@Override
public Tuple4<Boolean, Integer, String, String> map(Tuple2<Boolean, Row> tup)
throws Exception {
Boolean flag = tup.f0;
int id = Integer.parseInt(tup.f1.getField(0).toString());
String name = tup.f1.getField(1).toString();
String password = tup.f1.getField(2).toString();
return new Tuple4<Boolean, Integer, String, String>(flag, id, name, password);
}
}).print();
ssEnv.execute("TableToDataStreamJava");
}
}