FlinkStreaming程序开发之Scala方式。
在pom.xml中引入flink相关依赖,前面两个是针对java代码的,后面两个是针对scala代码的,最后一个依赖是这对flink1.11这个版本需要添加的。
...
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<spark-version>2.4.5</spark-version>
<scala-version>2.11.12</scala-version>
<janino-version>3.0.8</janino-version>
<guava-version>14.0.1</guava-version>
<fastjson.versionn>1.2.68</fastjson.versionn>
<flink.version>1.11.1</flink.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--
<scope>provided</scope>
-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!--
<scope>provided</scope>
-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
<!--注意:在idea等开发工具里面运行代码的时候需要把pom.xml中的scope配置注释掉-->
<!--
<scope>provided</scope>
-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
<!--注意:在idea等开发工具里面运行代码的时候需要把pom.xml中的scope配置注释掉-->
<!--
<scope>provided</scope>
-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Flink Job开发步骤如下:
注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序和Spark类似,Spark中是必须要有action算子才会真正执行。
Streaming WordCount案例:
需求: 通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来。
scala代码实现如下:
package com.simoniu.flink.scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 需求:通过socket实时产生一些单词
* 使用Flink实时接收数据
* 对指定时间窗口内(例如:2秒)的数据进行聚合统计
* 并且把时间窗口内计算的结果打印出来
* Created by simoniu
*/
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
//获取运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接socket获取输入数据
val text = env.socketTextStream("master", 9001)
//处理数据
//注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
.map((_,1))//每一个单词转换为tuple2的形式(单词,1)
//.keyBy(0)//根据tuple2中的第一列进行分组
.keyBy(tup=>tup._1)//官方推荐使用keySelector选择器选择数据
.timeWindow(Time.seconds(2))//时间窗口为2秒,表示每隔2秒钟计算一次接收到的数据
.sum(1)//使用sum或者reduce都可以
//.reduce((t1,t2)=>(t1._1,t1._2+t2._2))
//使用一个线程执行打印操作
wordCount.print().setParallelism(1)
//执行程序
env.execute("SocketWindowWordCountScala")
}
}
在master主机上上面开启socket,这里可以使用linux的nc命令。
nc(netcat)是一种用于 TCP/IP 网络连接的工具。它可以用来创建 TCP 和 UDP 连接,发送任意数据以及监听和接收连接。使用 nc 命令可以在命令行界面上轻松地与其他计算机建立连接并进行数据传输。在网络调试和测试中,nc 常被用作一种非常方便的工具。除此之外,nc 还可以用来进行端口扫描、数据加密等操作。
#首先安装nc命令。
[root@master ~]# yum install -y nc
#打开9001端口
[root@master ~]# nc -l 9001
此时再启动SocketWindowWordCountScala程序,执行主方法。
输入测试数据。
[root@master ~]# nc -l 9001
hello you
hello me
hello linux
hello you hello me hello linux
#ctrl+C退出
^C
运行结果:
(you,1)
(hello,1)
(hello,1)
(me,1)
(hello,1)
(linux,1)
(me,1)
(linux,1)
(you,1)
(hello,3)