← 返回首页
FlinkBatch程序开发(Scala)
发表时间:2023-11-16 15:20:43
FlinkBatch程序开发(Scala)

FlinkBatch程序开发(Scala)

1.FlinkBatch之Scala实现

上小节使用FlinkStreaming体现了实时性,而FlinkBatch体现了离线计算能力。

Batch WordCount案例: 需求:统计指定文件中单词出现的总次数。从而体现Flink的离线批处理计算能力。

由于我们要分析处理的文件在HDFS上,所以我们需要在上小节的pom.xml中添加hadoop-client依赖。

<properties>
    ...
    <hadoop-client.version>3.1.3</hadoop-client.version>
</properties>

<dependencies>
    ...
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop-client.version}</version>
    </dependency>
</dependencies>

scala代码实现:

package com.simoniu.flink.scala

import org.apache.flink.api.scala.ExecutionEnvironment

/**
 * 需求:统计指定文件中单词出现的总次数
 * Created by simoniu
 */
object BatchWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputPath = "hdfs://master:9000/test/input.txt"
    val outPath = "hdfs://master:9000/test/output.txt"

    //读取文件中的数据
    val text = env.readTextFile(inputPath)

    //处理数据
    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)//这里面设置并行度为1是为了将所有数据写到一个文件里面,查看结果比较方便

    //将结果数据保存到文件中
    wordCount.writeAsCsv(outPath,"\n"," ")
    //执行程序
    env.execute("BatchWordCountScala")
  }
}

input.txt测试数据如下:

I love spark
I like flink
I love hadoop
I like hadoop
I like hive
I like spark
I like flume
I like linux

运行结果:

[root@master sbin]# hdfs dfs -cat /test/output.txt
2023-11-16 23:47:13,365 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
I 8
flink 1
flume 1
hadoop 2
hive 1
like 6
linux 1
love 2
spark 2