DataSetAPI之Transformation-map
与DataStream类似 DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。
DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。
基于集合 fromCollection(Collection),主要是为了方便测试使用。它的用法和DataStreamAPI中的用法一样,我们已经用过很多次了。
基于文件 readTextFile(path),读取hdfs中的数据文件。这个前面我们也使用过了。

mapPartition这个算子我们在Flink中还没用过,不过在Spark 中是用过的,用法也是一样的 其实mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。
Flink中mapPartition的使用案例:
1).scala实现
package com.simoniu.flink.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* MapPartition的使用:一次处理一个分区的数据
* Created by simoniu
*/
object BatchMapPartitionScalaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//生成数据源数据
val text = env.fromCollection(Array("hello linux", "hello hadoop","hello spark"))
//每次处理一个分区的数据
text.mapPartition(it=>{
//可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
//注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,性能较高
val res = ListBuffer[String]()
it.foreach(line=>{
val words = line.split(" ")
for(word <- words){
res.append(word)
}
})
res
//关闭数据库连接
}).print()
//No new data sinks have been defined since the last execution.
//The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
//注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
//env.execute("BatchMapPartitionScala")
}
}
运行结果:
hello
linux
hello
hadoop
hello
spark
2).java实现
package com.simoniu.flink.java.batch.transformation;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Iterator;
/**
* MapPartition的使用:一次处理一个分区的数据
* Created by simoniu
*/
public class BatchMapPartitionJavaDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//生成数据源数据
DataSource<String> text = env.fromCollection(Arrays.asList("hello linux", "hello hadoop","hello spark"));
//每次处理一个分区的数据
text.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> iterable, Collector<String> out)
throws Exception {
//可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
Iterator<String> it = iterable.iterator();
while (it.hasNext()) {
String line = it.next();
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
//关闭数据库连接
}
}).print();
}
}