FlinkBatch程序开发(Java)
Batch WordCount案例: 需求:统计指定文件中单词出现的总次数。从而体现Flink的离线批处理计算能力。
Java代码实现如下:
package com.simoniu.flink.java;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 需求:统计指定文件中单词出现的总次数
* Created by simoniu
*/
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String inputPath = "hdfs://master:9000/test/input.txt";
String outPath = "hdfs://master:9000/test/output.txt";
//读取文件中的数据
DataSource<String> text = env.readTextFile(inputPath);
//处理数据
AggregateOperator<Tuple2<String, Integer>> wordCount = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).groupBy(0)
.sum(1)
.setParallelism(1);
//将结果数据保存到文件中
wordCount.writeAsCsv(outPath,"\n"," ");
//执行程序
env.execute("BatchWordCountJava");
}
}
input.txt测试数据如下:
I love spark
I like flink
I love hadoop
I like hadoop
I like hive
I like spark
I like flume
I like linux
运行结果:
[root@master sbin]# hdfs dfs -cat /test/output.txt
2023-11-16 23:47:13,365 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
I 8
flink 1
flume 1
hadoop 2
hive 1
like 6
linux 1
love 2
spark 2