开发Watermark代码。
乱序数据处理案例:
需求分析: 通过socket模拟产生数据,数据的格式为:0001,1790820682000 其中1790820682000是数据产生的时间,也就是EventTime。然后使用map函数对数据进行处理,把数据转换为tuple2的形式,接着再调用assignTimestampsAndWatermarks方法抽取timestamp并生成 Watermark。接着再调用Window打印信息来验证Window被触发的时机,最后验证乱序数据的处理方式。
scala代码实现:
package com.simoniu.flink.scala.window
import java.text.SimpleDateFormat
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting
/**
* Watermark+EventTime解决数据乱序问题
* Created by simoniu
*/
object WatermarkOpScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置使用数据产生的时间:EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置全局并行度为1
env.setParallelism(1)
//设置自动周期性的产生watermark,默认值为200毫秒
env.getConfig.setAutoWatermarkInterval(200)
val text = env.socketTextStream("master", 9001)
import org.apache.flink.api.scala._
//将数据转换为tuple2的形式
//第一列表示具体的数据,第二列表示是数据产生的时间戳
val tupStream = text.map(line => {
val arr = line.split(",")
(arr(0), arr(1).toLong)
})
//分配(提取)时间戳和watermark
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //最大允许的数据乱序时间 10s
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var currentMaxTimstamp = 0L
//从数据流中抽取时间戳作为EventTime
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
val timestamp = element._2
currentMaxTimstamp = Math.max(timestamp, currentMaxTimstamp)
//计算当前watermark,为了打印出来方便观察数据,没有别的作用,watermark=currentMaxTimstamp-OutOfOrderness
val currentWatermark = currentMaxTimstamp - 10000L
//此print语句仅仅是为了在学习阶段观察数据的变化
println("key:" + element._1 + "," + "eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimstamp:[" + currentWatermark + "|" + sdf.format(currentMaxTimstamp) + "],watermark:[" + currentWatermark + "|" + sdf.format(currentWatermark) + "]")
element._2
}
})
)
waterMarkStream.keyBy(0)
//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//使用全量聚合的方式处理window中的数据
.apply(new WindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
//将window中的数据保存到arrBuff中
val arrBuff = ArrayBuffer[Long]()
input.foreach(tup=>{
arrBuff.append(tup._2)
})
//将arrBuff转换为arr
val arr = arrBuff.toArray
//对arr中的数据进行排序
Sorting.quickSort(arr)
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//将目前window内排序后的数据,以及window的开始时间和window的结束时间打印出来,便于观察
val result = keyStr+","+arr.length+","+sdf.format(arr.head)+","+sdf.format(arr.last)+","+sdf.format(window.getStart)+","+sdf.format(window.getEnd)
out.collect(result)
}
}).print()
env.execute("WatermarkOpScala")
}
}
通过socket输入测试数据。
[root@master ~]# nc -l 9001
0001,1790820682000
0001,1790820686000
0001,1790820692000
0001,1790820693000
0001,1790820694000
运行结果:
key:0001,eventtime:[1790820682000|2026-10-01 10:11:22],currentMaxTimstamp:[1790820672000|2026-10-01 10:11:22],watermark:[1790820672000|2026-10-01 10:11:12]
key:0001,eventtime:[1790820686000|2026-10-01 10:11:26],currentMaxTimstamp:[1790820676000|2026-10-01 10:11:26],watermark:[1790820676000|2026-10-01 10:11:16]
key:0001,eventtime:[1790820692000|2026-10-01 10:11:32],currentMaxTimstamp:[1790820682000|2026-10-01 10:11:32],watermark:[1790820682000|2026-10-01 10:11:22]
key:0001,eventtime:[1790820693000|2026-10-01 10:11:33],currentMaxTimstamp:[1790820683000|2026-10-01 10:11:33],watermark:[1790820683000|2026-10-01 10:11:23]
key:0001,eventtime:[1790820694000|2026-10-01 10:11:34],currentMaxTimstamp:[1790820684000|2026-10-01 10:11:34],watermark:[1790820684000|2026-10-01 10:11:24]
(0001),1,2026-10-01 10:11:22,2026-10-01 10:11:22,2026-10-01 10:11:21,2026-10-01 10:11:24
此时,我们发现,Window的触发要符合以下几个条件。 1:Watermark 时间 >=window_end_time 2:在[window_start_time,window_end_time)[X间中有数据存在(注意是左闭右开的区间)。 同时满足了以上2个条件,Window才会触发。