← 返回首页
KafkaProducer的容错
发表时间:2023-12-20 15:46:18
KafkaProducer的容错

KafkaProducer的容错

1.KafkaProducer的容错

如果Flink开启了CheckPoint,针对FlinkKafkaProducer可以提供EXACTLY_ONCE的语义保证。可以通过semantic参数来选择三种不同的语义:

scala实现如下:

package com.simoniu.flink.scala.kafkaconnector

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner

/**
 * Flink向Kafka中生产数据
 * Created by simoniu
 */
object StreamKafkaSinkScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //开启checkpoint
    env.enableCheckpointing(5000);

    //注意:这里的端口不能使用9001,因为CMAK默认占用了 9001端口。
    val text = env.socketTextStream("master", 9002)

    //指定FlinkKafkaProducer的相关配置
    val topic = "t3"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","master:9092,slave1:9092,slave2:9092")

    //指定kafka作为sink
    /*
     KafkaSerializationSchemaWrapper的几个参数
     1:topic:指定需要写入的topic名称即可
     2:partitioner,通过自定义分区器实现将数据写入到指定topic的具体分区中
     默认会使用FlinkFixedPartitioner,它表示会将所有的数据都写入指定topic的一个分区里面
     如果不想自定义分区器,也不想使用默认的,可以直接使用null即可
     3:writeTimeStamp,向topic中写入数据的时候,是否写入时间戳
     如果写入了,那么在watermark的案例中,使用extractTimestamp()提起时间戳的时候
     就可以直接使用recordTimestamp即可,它表示的就是我们在这里写入的数据对应的timestamp
     */
    //val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, new FlinkFixedPartitioner[String](), false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

    val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

    text.addSink(kafkaProducer)
    env.execute("StreamKafkaSinkScala")

  }
}

注意:此时执行代码会发现无法正常执行,socket打开之后,启动代码,会发现socket监听会自动断开,表示代码执行断开了。

出现这个错误的原因是:生产者中设置的事务超时时间大于broker中设置的事务超时时间。因为Kafka服务中默认事务的超时时间是15min,但是FlinkKafkaProducer里面设置的事务超时时间默认是1h。EXACTLY_ONCE 模式依赖于事务,如果从 Flink 应用程序崩溃到完全重启的时间超过了Kafka的事务超时时间,那么将会有数据丢失,所以我们需要合理地配置事务超时时间,因此在使用 EXACTLY_ONCE 模式之前建议增加 Kafka broker 中transaction.max.timeout.ms 的值。

下面我们需要修改kafka中的server.properties配置文件,master、slave1、slave2都需要修改。

[root@master kafka]# vim config/server.properties
...
transaction.max.timeout.ms=3600000

[root@slave1 kafka]# vim config/server.properties
...
transaction.max.timeout.ms=3600000

[root@slave2 kafka]# vim config/server.properties
...
transaction.max.timeout.ms=3600000

改完配置文件之后,重启kafka,重新执行Flink代码,此时就不报错了。