KafkaProducer的容错
如果Flink开启了CheckPoint,针对FlinkKafkaProducer可以提供EXACTLY_ONCE的语义保证。可以通过semantic参数来选择三种不同的语义:
scala实现如下:
package com.simoniu.flink.scala.kafkaconnector
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
/**
* Flink向Kafka中生产数据
* Created by simoniu
*/
object StreamKafkaSinkScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//开启checkpoint
env.enableCheckpointing(5000);
//注意:这里的端口不能使用9001,因为CMAK默认占用了 9001端口。
val text = env.socketTextStream("master", 9002)
//指定FlinkKafkaProducer的相关配置
val topic = "t3"
val prop = new Properties()
prop.setProperty("bootstrap.servers","master:9092,slave1:9092,slave2:9092")
//指定kafka作为sink
/*
KafkaSerializationSchemaWrapper的几个参数
1:topic:指定需要写入的topic名称即可
2:partitioner,通过自定义分区器实现将数据写入到指定topic的具体分区中
默认会使用FlinkFixedPartitioner,它表示会将所有的数据都写入指定topic的一个分区里面
如果不想自定义分区器,也不想使用默认的,可以直接使用null即可
3:writeTimeStamp,向topic中写入数据的时候,是否写入时间戳
如果写入了,那么在watermark的案例中,使用extractTimestamp()提起时间戳的时候
就可以直接使用recordTimestamp即可,它表示的就是我们在这里写入的数据对应的timestamp
*/
//val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, new FlinkFixedPartitioner[String](), false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
val kafkaProducer = new FlinkKafkaProducer[String](topic, new KafkaSerializationSchemaWrapper[String](topic, null, false, new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
text.addSink(kafkaProducer)
env.execute("StreamKafkaSinkScala")
}
}
注意:此时执行代码会发现无法正常执行,socket打开之后,启动代码,会发现socket监听会自动断开,表示代码执行断开了。
出现这个错误的原因是:生产者中设置的事务超时时间大于broker中设置的事务超时时间。因为Kafka服务中默认事务的超时时间是15min,但是FlinkKafkaProducer里面设置的事务超时时间默认是1h。EXACTLY_ONCE 模式依赖于事务,如果从 Flink 应用程序崩溃到完全重启的时间超过了Kafka的事务超时时间,那么将会有数据丢失,所以我们需要合理地配置事务超时时间,因此在使用 EXACTLY_ONCE 模式之前建议增加 Kafka broker 中transaction.max.timeout.ms 的值。
下面我们需要修改kafka中的server.properties配置文件,master、slave1、slave2都需要修改。
[root@master kafka]# vim config/server.properties
...
transaction.max.timeout.ms=3600000
[root@slave1 kafka]# vim config/server.properties
...
transaction.max.timeout.ms=3600000
[root@slave2 kafka]# vim config/server.properties
...
transaction.max.timeout.ms=3600000
改完配置文件之后,重启kafka,重新执行Flink代码,此时就不报错了。