← 返回首页
将表转换成DataStream
发表时间:2023-11-28 14:35:11
将表转换成DataStream

将表转换成DataStream。

1.将表转换成DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。有以下几种模式可以将Table转换为DataStream。

1).scala实现

这里使用了《TableAPI和SQL的使用》小节的测试数据创建表。

package com.simoniu.flink.scala.tablesql

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

/**
 * 将table转换成 DataStream
 * Created by simoniu
 */
object TableToDataStreamScalaDemo {
  def main(args: Array[String]): Unit = {
    //获取StreamTableEnvironment
    val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

    //创建输入表
    ssTableEnv.executeSql("" +
      "create table source_users(\n" +
      "id int,\n" +
      "name string,\n" +
      "password string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:\\uploadFiles\\source',\n" +
      "'format.type' = 'csv'\n" +
      ")")
    //获取table
    val table = ssTableEnv.from("source_users")

    //将table转换为DataStream
    //如果只有新增(追加)操作,可以使用toAppendStream
    import org.apache.flink.api.scala._
    val appStream = ssTableEnv.toAppendStream[Row](table)
    appStream.map(row => (row.getField(0).toString.toInt, row.getField(1).toString, row.getField(2).toString))
      .print()

    //如果有增加操作,还有删除操作,则使用toRetractStream
    val retStream = ssTableEnv.toRetractStream[Row](table)
    retStream.map(tup => {
      val flag = tup._1
      val row = tup._2
      val id = row.getField(0).toString.toInt
      val name = row.getField(1).toString
      val password = row.getField(2).toString
      (flag, id, name, password)
    }).print()
    //注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvironment中的execute方法了
    ssEnv.execute("TableToDataStreamScala")
  }
}

运行结果:

4> (205,郭靖,888888)
8> (209,欧阳锋,55555)
9> (210,黄药师,66666)
5> (206,黄蓉,666666)
12> (202,东方不败,654321)
4> (true,203,任我选,222222)
2> (204,风清扬,888888)
7> (208,王重阳,11111)
6> (207,令狐冲,999999)
11> (true,209,欧阳锋,55555)
7> (true,205,郭靖,888888)
12> (true,210,黄药师,66666)
11> (201,张三丰,123456)
9> (true,207,令狐冲,999999)
3> (true,202,东方不败,654321)
2> (true,201,张三丰,123456)
5> (true,204,风清扬,888888)
8> (true,206,黄蓉,666666)
1> (203,任我选,222222)
10> (true,208,王重阳,11111)

2).java实现

package com.simoniu.flink.java.tablesql;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 将table转换成 DataStream
 * Created by simoniu
 */
public class TableToDataStreamJavaDemo {
    public static void main(String[] args) throws Exception {
        //获取StreamTableEnvironment
        StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);

        //创建输入表
        ssTableEnv.executeSql("" +
                "create table source_users(\n" +
                "id int,\n" +
                "name string,\n" +
                "password string\n" +
                ") with (\n" +
                "'connector.type' = 'filesystem',\n" +
                "'connector.path' = 'D:\\uploadFiles\\source',\n" +
                "'format.type' = 'csv'\n" +
                ")");
        //获取table
        Table table = ssTableEnv.from("source_users");

        //将table转换为DataStream
        //如果只有新增(追加)操作,可以使用toAppendStream
        DataStream<Row> appStream = ssTableEnv.toAppendStream(table, Row.class);
        appStream.map(new MapFunction<Row, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> map(Row row)
                    throws Exception {
                int id = Integer.parseInt(row.getField(0).toString());
                String name = row.getField(1).toString();
                String password = row.getField(2).toString();
                return new Tuple3<Integer, String, String>(id, name, password);
            }
        }).print();

        //如果有增加操作,还有删除操作,则使用toRetractStream
        DataStream<Tuple2<Boolean, Row>> retStream = ssTableEnv.toRetractStream(table, Row.class);
        retStream.map(new MapFunction<Tuple2<Boolean, Row>, Tuple4<Boolean, Integer, String, String>>() {
            @Override
            public Tuple4<Boolean, Integer, String, String> map(Tuple2<Boolean, Row> tup)
                    throws Exception {
                Boolean flag = tup.f0;
                int id = Integer.parseInt(tup.f1.getField(0).toString());
                String name = tup.f1.getField(1).toString();
                String password = tup.f1.getField(2).toString();
                return new Tuple4<Boolean, Integer, String, String>(flag, id, name, password);
            }
        }).print();

        ssEnv.execute("TableToDataStreamJava");
    }
}