← 返回首页
Transformation-mapPartition
发表时间:2023-11-24 16:12:21
DataSetAPI之Transformation-mapPartition

DataSetAPI之Transformation-map

1.DataSet API

与DataStream类似 DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。

2.DataSet API之DataSource

DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。

3.DataSet API之Transformation

mapPartition这个算子我们在Flink中还没用过,不过在Spark 中是用过的,用法也是一样的 其实mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。

Flink中mapPartition的使用案例:

1).scala实现

package com.simoniu.flink.scala.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

/**
 * MapPartition的使用:一次处理一个分区的数据
 * Created by simoniu
 */
object BatchMapPartitionScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //生成数据源数据
    val text = env.fromCollection(Array("hello linux", "hello hadoop","hello spark"))

    //每次处理一个分区的数据
    text.mapPartition(it=>{
      //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
      //注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,性能较高
      val res = ListBuffer[String]()
      it.foreach(line=>{
        val words = line.split(" ")
        for(word <- words){
          res.append(word)
        }
      })
      res
      //关闭数据库连接
    }).print()
    //No new data sinks have been defined since the last execution.
    //The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
    //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
    //env.execute("BatchMapPartitionScala")
  }
}

运行结果:

hello
linux
hello
hadoop
hello
spark

2).java实现

package com.simoniu.flink.java.batch.transformation;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Iterator;

/**
 * MapPartition的使用:一次处理一个分区的数据
 * Created by simoniu
 */
public class BatchMapPartitionJavaDemo {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //生成数据源数据
        DataSource<String> text = env.fromCollection(Arrays.asList("hello linux", "hello hadoop","hello spark"));

        //每次处理一个分区的数据
        text.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> out)
                    throws Exception {
                //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
                Iterator<String> it = iterable.iterator();
                while (it.hasNext()) {
                    String line = it.next();
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(word);
                    }
                }
                //关闭数据库连接
            }
        }).print();
    }
}