FlinkBatch程序开发(Scala)
上小节使用FlinkStreaming体现了实时性,而FlinkBatch体现了离线计算能力。
Batch WordCount案例: 需求:统计指定文件中单词出现的总次数。从而体现Flink的离线批处理计算能力。
由于我们要分析处理的文件在HDFS上,所以我们需要在上小节的pom.xml中添加hadoop-client依赖。
<properties>
...
<hadoop-client.version>3.1.3</hadoop-client.version>
</properties>
<dependencies>
...
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-client.version}</version>
</dependency>
</dependencies>
scala代码实现:
package com.simoniu.flink.scala
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* 需求:统计指定文件中单词出现的总次数
* Created by simoniu
*/
object BatchWordCountScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "hdfs://master:9000/test/input.txt"
val outPath = "hdfs://master:9000/test/output.txt"
//读取文件中的数据
val text = env.readTextFile(inputPath)
//处理数据
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
.setParallelism(1)//这里面设置并行度为1是为了将所有数据写到一个文件里面,查看结果比较方便
//将结果数据保存到文件中
wordCount.writeAsCsv(outPath,"\n"," ")
//执行程序
env.execute("BatchWordCountScala")
}
}
input.txt测试数据如下:
I love spark
I like flink
I love hadoop
I like hadoop
I like hive
I like spark
I like flume
I like linux
运行结果:
[root@master sbin]# hdfs dfs -cat /test/output.txt
2023-11-16 23:47:13,365 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
I 8
flink 1
flume 1
hadoop 2
hive 1
like 6
linux 1
love 2
spark 2