← 返回首页
将表转换成DataSet
发表时间:2023-11-28 14:39:44
将表转换成DataSet

将表转换成DataSet。

1.将表转换成 DataSet

1).scala实现

这里使用了《TableAPI和SQL的使用》小节的测试数据创建表。

package com.simoniu.flink.scala.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.types.Row

/**
 * 将table转换成 DataSet
 * Created by simoniu
 */

object TableToDataSetScalaDemo {
  def main(args: Array[String]): Unit = {
    //获取BatchTableEnvironment
    val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    val bbTableEnv = BatchTableEnvironment.create(bbEnv)

    //创建输入表
    bbTableEnv.executeSql("" +
      "create table source_users(\n" +
      "id int,\n" +
      "name string,\n" +
      "password string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = 'D:\\uploadFiles\\source',\n" +
      "'format.type' = 'csv'\n" +
      ")")
    //获取table
    val table = bbTableEnv.from("source_users")

    //将table转换为DataSet
    import org.apache.flink.api.scala._
    val set = bbTableEnv.toDataSet[Row](table)
    set.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString,row.getField(2).toString))
      .print()

  }
}

运行结果:

(210,黄药师,66666)
(208,王重阳,11111)
(205,郭靖,888888)
(204,风清扬,888888)
(201,张三丰,123456)
(203,任我选,222222)
(206,黄蓉,666666)
(207,令狐冲,999999)
(209,欧阳锋,55555)
(202,东方不败,654321)

2).java实现

package com.simoniu.flink.java.tablesql;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 将table转换成 DataSet
 * Created by simoniu
 */
public class TableToDataSetJavaDemo {

    public static void main(String[] args) throws Exception {
        //获取BatchTableEnvironment
        ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);

        //创建输入表
        bbTableEnv.executeSql("" +
                "create table source_users(\n" +
                "id int,\n" +
                "name string,\n" +
                "password string\n" +
                ") with (\n" +
                "'connector.type' = 'filesystem',\n" +
                "'connector.path' = 'D:\\uploadFiles\\source',\n" +
                "'format.type' = 'csv'\n" +
                ")");
        //获取table
        Table table = bbTableEnv.from("source_users");

        //将table转换为DataSet
        DataSet<Row> set = bbTableEnv.toDataSet(table, Row.class);
        set.map(new MapFunction<Row, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> map(Row row)
                    throws Exception {
                int id = Integer.parseInt(row.getField(0).toString());
                String name = row.getField(1).toString();
                String password = row.getField(2).toString();
                return new Tuple3<Integer, String, String>(id, name, password);
            }
        }).print();
    }
}