← 返回首页
开发Watermark代码
发表时间:2023-12-09 16:07:37
开发Watermark代码

开发Watermark代码。

1.开发Watermark代码

乱序数据处理案例:

需求分析: 通过socket模拟产生数据,数据的格式为:0001,1790820682000 其中1790820682000是数据产生的时间,也就是EventTime。然后使用map函数对数据进行处理,把数据转换为tuple2的形式,接着再调用assignTimestampsAndWatermarks方法抽取timestamp并生成 Watermark。接着再调用Window打印信息来验证Window被触发的时机,最后验证乱序数据的处理方式。

scala代码实现:

package com.simoniu.flink.scala.window

import java.text.SimpleDateFormat
import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

/**
 * Watermark+EventTime解决数据乱序问题
 * Created by simoniu
 */
object WatermarkOpScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置使用数据产生的时间:EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //设置全局并行度为1
    env.setParallelism(1)

    //设置自动周期性的产生watermark,默认值为200毫秒
    env.getConfig.setAutoWatermarkInterval(200)


    val text = env.socketTextStream("master", 9001)
    import org.apache.flink.api.scala._
    //将数据转换为tuple2的形式
    //第一列表示具体的数据,第二列表示是数据产生的时间戳
    val tupStream = text.map(line => {
      val arr = line.split(",")
      (arr(0), arr(1).toLong)
    })

    //分配(提取)时间戳和watermark
    val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //最大允许的数据乱序时间 10s
      .withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        var currentMaxTimstamp = 0L

        //从数据流中抽取时间戳作为EventTime
        override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
          val timestamp = element._2
          currentMaxTimstamp = Math.max(timestamp, currentMaxTimstamp)
          //计算当前watermark,为了打印出来方便观察数据,没有别的作用,watermark=currentMaxTimstamp-OutOfOrderness
          val currentWatermark = currentMaxTimstamp - 10000L
          //此print语句仅仅是为了在学习阶段观察数据的变化
          println("key:" + element._1 + "," + "eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimstamp:[" + currentWatermark + "|" + sdf.format(currentMaxTimstamp) + "],watermark:[" + currentWatermark + "|" + sdf.format(currentWatermark) + "]")
          element._2
        }
      })
    )

    waterMarkStream.keyBy(0)
      //按照消息的EventTime分配窗口,和调用TimeWindow效果一样
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      //使用全量聚合的方式处理window中的数据
      .apply(new WindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow] {
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
          val keyStr = key.toString
          //将window中的数据保存到arrBuff中
          val arrBuff = ArrayBuffer[Long]()
          input.foreach(tup=>{
            arrBuff.append(tup._2)
          })
          //将arrBuff转换为arr
          val arr = arrBuff.toArray
          //对arr中的数据进行排序
          Sorting.quickSort(arr)

          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          //将目前window内排序后的数据,以及window的开始时间和window的结束时间打印出来,便于观察
          val result = keyStr+","+arr.length+","+sdf.format(arr.head)+","+sdf.format(arr.last)+","+sdf.format(window.getStart)+","+sdf.format(window.getEnd)
          out.collect(result)
        }
      }).print()

    env.execute("WatermarkOpScala")
  }
}

通过socket输入测试数据。

[root@master ~]# nc -l 9001
0001,1790820682000
0001,1790820686000
0001,1790820692000
0001,1790820693000
0001,1790820694000

运行结果:

key:0001,eventtime:[1790820682000|2026-10-01 10:11:22],currentMaxTimstamp:[1790820672000|2026-10-01 10:11:22],watermark:[1790820672000|2026-10-01 10:11:12]
key:0001,eventtime:[1790820686000|2026-10-01 10:11:26],currentMaxTimstamp:[1790820676000|2026-10-01 10:11:26],watermark:[1790820676000|2026-10-01 10:11:16]
key:0001,eventtime:[1790820692000|2026-10-01 10:11:32],currentMaxTimstamp:[1790820682000|2026-10-01 10:11:32],watermark:[1790820682000|2026-10-01 10:11:22]
key:0001,eventtime:[1790820693000|2026-10-01 10:11:33],currentMaxTimstamp:[1790820683000|2026-10-01 10:11:33],watermark:[1790820683000|2026-10-01 10:11:23]
key:0001,eventtime:[1790820694000|2026-10-01 10:11:34],currentMaxTimstamp:[1790820684000|2026-10-01 10:11:34],watermark:[1790820684000|2026-10-01 10:11:24]
(0001),1,2026-10-01 10:11:22,2026-10-01 10:11:22,2026-10-01 10:11:21,2026-10-01 10:11:24

此时,我们发现,Window的触发要符合以下几个条件。 1:Watermark 时间 >=window_end_time 2:在[window_start_time,window_end_time)[X间中有数据存在(注意是左闭右开的区间)。 同时满足了以上2个条件,Window才会触发。