← 返回首页
Transformation-first-n
发表时间:2023-11-25 16:37:52
DataSetAPI之Transformation-first-n

DataSetAPI之Transformation-first-n。

1.DataSetAPI之Transformation-first-n

union:返回两个数据集的总和,数据类型需要一致。和DataStreamAPI中的union操作功能一样,这里我们不再赘述。

first-n:获取集合中的前N个元素。

first-n案例:

1)scala实现

package com.simoniu.flink.scala.batch.transformation

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

/**
 * first-n:获取集合中的前N个元素
 * Created by simoniu
 */
object BatchFirstNScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val data = ListBuffer[Tuple2[Int,String]]()
    data.append((2,"zookeeper"))
    data.append((4,"link"))
    data.append((3,"windows"))
    data.append((1,"applie"))
    data.append((1,"xing"))
    data.append((1,"maven"))

    import org.apache.flink.api.scala._
    //初始化数据
    val text = env.fromCollection(data)

    //获取前3条数据,按照数据插入的顺序
    text.first(3).print()
    println("==================================")

    //根据数据中的第一列进行分组,获取每组的前2个元素
    text.groupBy(0).first(2).print()
    println("==================================")

    //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
    //分组排序取TopN
    text.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()

  }
}

运行结果:

(2,zookeeper)
(4,link)
(3,windows)
==================================
(3,windows)
(1,applie)
(1,xing)
(4,link)
(2,zookeeper)
==================================
(3,windows)
(1,xing)
(1,maven)
(4,link)
(2,zookeeper)

2).java实现

package com.simoniu.flink.java.batch.transformation;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

/**
 * first-n:获取集合中的前N个元素
 * Created by simoniu
 */
public class BatchFirstNJavaDemo {

    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<Integer,String>(2,"zookeeper"));
        data.add(new Tuple2<Integer,String>(4,"link"));
        data.add(new Tuple2<Integer,String>(3,"windows"));
        data.add(new Tuple2<Integer,String>(1,"appke"));
        data.add(new Tuple2<Integer,String>(1,"xing"));
        data.add(new Tuple2<Integer,String>(1,"maven"));
        //初始化数据
        DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);

        //获取前3条数据,按照数据插入的顺序
        text.first(3).print();
        System.out.println("====================================");

        //根据数据中的第一列进行分组,获取每组的前2个元素
        text.groupBy(0).first(2).print();
        System.out.println("====================================");

        //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
        //分组排序取TopN
        text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
    }
}

2.DataSet API之DataSink

Flink针对DataSet提供了一些已经实现好的数据目的地,其中最常见的是向HDFS中写入数据。常用的API如下: