Kafka Consumer消费策略设置。
针对Kafka Consumer消费数据的时候会有一些策略,我们来看一下:
scala代码实现如下:
package com.simoniu.flink.scala.kafkaconnector
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* Flink从kafka中消费数据
* Created by simoniu
*/
object StreamKafkaSourceScalaDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定FlinkKafkaConsumer相关配置
val topic = "t1"
val prop = new Properties()
prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092")
prop.setProperty("group.id", "con1")
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//kafka consumer的消费策略设置
//默认策略,读取group.id对应保存的offset开始消费数据,读取不到则根据kafka中auto.offset.reset参数的值开始消费数据
//kafkaConsumer.setStartFromGroupOffsets()
//从最早的记录开始消费数据,忽略已提交的offset信息
kafkaConsumer.setStartFromEarliest()
//从最新的记录开始消费数据,忽略已提交的offset信息
//kafkaConsumer.setStartFromLatest()
//从指定的时间戳开始消费数据,对于每个分区,其时间戳大于或等于指定时间戳的记录将被作为起始位置
//kafkaConsumer.setStartFromTimestamp(1769498624)
//指定Kafka作为Source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)
//将读取到的数据打印到控制台上
text.print()
env.execute("StreamKafkaSourceScala")
}
}