← 返回首页
延迟数据的三种处理方式
发表时间:2023-12-19 15:25:44
延迟数据的三种处理方式

延迟数据的三种处理方式。

1.延迟数据的三种处理方式

针对延迟太久的数据有三种处理方案。

  1. 丢弃(默认)
  2. allowedLateness指定允许数据延迟的时间
  3. sideOutputLateData收集迟到的数据

Flink的watermark针对延迟太久的数据因为没有触发Window。因为输入的数据所在的窗口在前面已经执行过了,Flink默认对这些迟到太久的数据的处理方案就是丢弃。这里我们不再探讨。

1).allowedLateness

在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。Flink提供了allowedLateness方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发window执行的。

只需要增加这一行代码即可:

    waterMarkStream.keyBy(0)
      //按照消息的EventTime分配窗口,和调用TimeWindow效果一样
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      //允许数据迟到2秒
      .allowedLateness(Time.seconds(2))
     ...

2).sideOutputLateData

通过sideOutputLateData函数可以把迟到的数据统一收集,统一存储,方便后期排查问题。

代码调整如下:

    val resStream = waterMarkStream.keyBy(0)
      //按照消息的EventTime分配窗口,和调用TimeWindow效果一样
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      //保存被丢弃的数据
      .sideOutputLateData(outputTag)

    ...
    //把迟到的数据取出来,暂时打印到控制台,实际工作中可以选择存储到其它存储介质中
    //例如:redis,kafka
    val sideOutput = resStream.getSideOutput(outputTag)
    sideOutput.print()

    //将流中的结果数据也打印到控制台
    resStream.print()
    env.execute("WatermarkOpScala")

此时,针对这几条迟到的数据,都通过sideOutputLateData保存到了outputTag中。