← 返回首页
KafkaConsumer的容错
发表时间:2023-12-20 08:25:04
KafkaConsumer的容错

KafkaConsumer的容错

1.KafkaConsumer的容错

Flink中也有checkpoint机制,Checkpoint是Flink实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的State来生成快照,从而将这些State数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

当CheckPoint机制开启的时候,Consumer会定期把Kafka的offset信息还有其它算子任务的State信息一块保存起来,当Job失败重启的时候,Flink会从最近一次的CheckPoint中进行恢复数据,重新消费Kafka中的数据。为了能够使用支持容错的Consumer,需要开启checkpoint。

2.如何开启Checkpoint

使用env.enableCheckpointing方法开启Checkpoint。

//每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期)
env.enableCheckpointing(5000)

针对checkpoint还有一些相关的配置。

//设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
env.getCheckpointConfig.setCheckpointTimeout(60000)
//同一时间只允许执行一个Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

最后还有一个配置,设置State数据存储的位置。默认情况下,State数据会保存在TaskManager的内存中,Checkpoint执行时,会将State数据存储在JobManager的内存中。

具体的存储位置取决于State Backend的配置,Flink 一共提供了3种存储方式。

这里我们使用第三种:RocksDBStateBackend,针对RocksDBStateBackend需要引入依赖。

<properties>
  ...
  <flink.version>1.11.1</flink.version>
</properties>

<dependencies>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
      <version>${flink.version}</version>
   </dependency>
</dependencies>

使用scala测试RocksDBStateBackend。

package com.simoniu.flink.scala.kafkaconnector

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * Flink从kafka中消费数据
 * Created by simoniu
 */
object StreamKafkaSourceScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //每隔5000 ms执行一次Checkpoint(设置Checkpoint的周期)
    env.enableCheckpointing(5000)

    //针对checkpoint的相关配置
    //设置模式为.EXACTLY_ONCE (这是默认值) ,还可以设置为AT_LEAST_ONCE
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //确保两次Checkpoint之间有至少多少 ms的间隔(checkpoint最小间隔)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    //Checkpoint必须在一分钟内完成,或者被丢弃(checkpoint的超时时间)
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    //同一时间只允许执行一个Checkpoint
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置状态数据存储的位置
    env.setStateBackend(new RocksDBStateBackend("hdfs://master:9000/flink/checkpoints", true))

    //指定FlinkKafkaConsumer相关配置
    val topic = "t1"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092")
    prop.setProperty("group.id", "con1")
    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

    //kafka consumer的消费策略设置
    //默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
    //kafkaConsumer.setStartFromGroupOffsets()
    //从最早的记录开始消费数据,忽略已提交的offset信息
    kafkaConsumer.setStartFromEarliest()
    //从最新的记录开始消费数据,忽略已提交的offset信息
    //kafkaConsumer.setStartFromLatest()
    //从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
    //kafkaConsumer.setStartFromTimestamp(1769498624)

    //指定Kafka作为Source
    import org.apache.flink.api.scala._
    val text = env.addSource(kafkaConsumer)

    //将读取到的数据打印到控制台上
    text.print()
    env.execute("StreamKafkaSourceScala")
  }
}

然后启动代码,再启动一个Kafka 的 console生产者模拟产生数据,验证效果。

[root@master kafka]# cd $KAFKA_HOME
[root@master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic t1
>hello kafka

在hdfs上观察checkpoint快照文件。

3.Kafka Consumers Offset 自动提交

Kafka Consumers Offset自动提交机制需要根据Job是否开启Checkpoint来区分。 CheckPoint关闭时:通过参数enable.auto.commit和auto.commit.interval.ms控制。 CheckPoint开启时:执行CheckPoint的时候才会提交offset,此时kafka中的自动提交机制就会被忽略。