← 返回首页
KafkaConsumer消费策略设置
发表时间:2023-12-20 08:13:52
KafkaConsumer消费策略设置

Kafka Consumer消费策略设置。

1.Kafka Consumer消费策略设置

针对Kafka Consumer消费数据的时候会有一些策略,我们来看一下:

scala代码实现如下:

package com.simoniu.flink.scala.kafkaconnector

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * Flink从kafka中消费数据
 * Created by simoniu
 */
object StreamKafkaSourceScalaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //指定FlinkKafkaConsumer相关配置
    val topic = "t1"
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092")
    prop.setProperty("group.id", "con1")
    val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)

    //kafka consumer的消费策略设置
    //默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
    //kafkaConsumer.setStartFromGroupOffsets()
    //从最早的记录开始消费数据,忽略已提交的offset信息
    kafkaConsumer.setStartFromEarliest()
    //从最新的记录开始消费数据,忽略已提交的offset信息
    //kafkaConsumer.setStartFromLatest()
    //从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
    //kafkaConsumer.setStartFromTimestamp(1769498624)

    //指定Kafka作为Source
    import org.apache.flink.api.scala._
    val text = env.addSource(kafkaConsumer)

    //将读取到的数据打印到控制台上
    text.print()
    env.execute("StreamKafkaSourceScala")
  }

}