DataSetAPI之Transformation-first-n。
union:返回两个数据集的总和,数据类型需要一致。和DataStreamAPI中的union操作功能一样,这里我们不再赘述。
first-n:获取集合中的前N个元素。
first-n案例:
1)scala实现
package com.simoniu.flink.scala.batch.transformation
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* first-n:获取集合中的前N个元素
* Created by simoniu
*/
object BatchFirstNScalaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = ListBuffer[Tuple2[Int,String]]()
data.append((2,"zookeeper"))
data.append((4,"link"))
data.append((3,"windows"))
data.append((1,"applie"))
data.append((1,"xing"))
data.append((1,"maven"))
import org.apache.flink.api.scala._
//初始化数据
val text = env.fromCollection(data)
//获取前3条数据,按照数据插入的顺序
text.first(3).print()
println("==================================")
//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print()
println("==================================")
//根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
//分组排序取TopN
text.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()
}
}
运行结果:
(2,zookeeper)
(4,link)
(3,windows)
==================================
(3,windows)
(1,applie)
(1,xing)
(4,link)
(2,zookeeper)
==================================
(3,windows)
(1,xing)
(1,maven)
(4,link)
(2,zookeeper)
2).java实现
package com.simoniu.flink.java.batch.transformation;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
/**
* first-n:获取集合中的前N个元素
* Created by simoniu
*/
public class BatchFirstNJavaDemo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<Integer,String>(2,"zookeeper"));
data.add(new Tuple2<Integer,String>(4,"link"));
data.add(new Tuple2<Integer,String>(3,"windows"));
data.add(new Tuple2<Integer,String>(1,"appke"));
data.add(new Tuple2<Integer,String>(1,"xing"));
data.add(new Tuple2<Integer,String>(1,"maven"));
//初始化数据
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
//获取前3条数据,按照数据插入的顺序
text.first(3).print();
System.out.println("====================================");
//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print();
System.out.println("====================================");
//根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
//分组排序取TopN
text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
}
}
Flink针对DataSet提供了一些已经实现好的数据目的地,其中最常见的是向HDFS中写入数据。常用的API如下: