将表转换成DataSet。
1).scala实现
这里使用了《TableAPI和SQL的使用》小节的测试数据创建表。
package com.simoniu.flink.scala.tablesql
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.types.Row
/**
* 将table转换成 DataSet
* Created by simoniu
*/
object TableToDataSetScalaDemo {
def main(args: Array[String]): Unit = {
//获取BatchTableEnvironment
val bbEnv = ExecutionEnvironment.getExecutionEnvironment
val bbTableEnv = BatchTableEnvironment.create(bbEnv)
//创建输入表
bbTableEnv.executeSql("" +
"create table source_users(\n" +
"id int,\n" +
"name string,\n" +
"password string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:\\uploadFiles\\source',\n" +
"'format.type' = 'csv'\n" +
")")
//获取table
val table = bbTableEnv.from("source_users")
//将table转换为DataSet
import org.apache.flink.api.scala._
val set = bbTableEnv.toDataSet[Row](table)
set.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString,row.getField(2).toString))
.print()
}
}
运行结果:
(210,黄药师,66666)
(208,王重阳,11111)
(205,郭靖,888888)
(204,风清扬,888888)
(201,张三丰,123456)
(203,任我选,222222)
(206,黄蓉,666666)
(207,令狐冲,999999)
(209,欧阳锋,55555)
(202,东方不败,654321)
2).java实现
package com.simoniu.flink.java.tablesql;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* 将table转换成 DataSet
* Created by simoniu
*/
public class TableToDataSetJavaDemo {
public static void main(String[] args) throws Exception {
//获取BatchTableEnvironment
ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);
//创建输入表
bbTableEnv.executeSql("" +
"create table source_users(\n" +
"id int,\n" +
"name string,\n" +
"password string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:\\uploadFiles\\source',\n" +
"'format.type' = 'csv'\n" +
")");
//获取table
Table table = bbTableEnv.from("source_users");
//将table转换为DataSet
DataSet<Row> set = bbTableEnv.toDataSet(table, Row.class);
set.map(new MapFunction<Row, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> map(Row row)
throws Exception {
int id = Integer.parseInt(row.getField(0).toString());
String name = row.getField(1).toString();
String password = row.getField(2).toString();
return new Tuple3<Integer, String, String>(id, name, password);
}
}).print();
}
}