消费者代码扩展。
//==================================================
//开启自动提交offset功能,默认就是开启的
prop.put("enable.auto.commit","true");
//自动提交offset的时间间隔,单位是毫秒
prop.put("auto.commit.interval.ms","5000");
/*
注意:正常情况下,kafka消费数据的流程是这样一个流程。
先根据group.id指定的消费者组到kafka中查找之前保存的offset信息,如果查找到了,说明之前使用这个消费者组消费过数据,则根据之前保存的offset继续进行消费。
如果没查找到(说明第一次消费),或者查找到了,但是查找到的那个offset对应的数据已经不存在了,这个时候消费者该如何消费数据?(因为kafka默认只会保存7天的数据,超过时间数据会被删除)
此时会根据auto.offset.reset的值执行不同的消费逻辑。
这个参数的值有三种:[earliest,latest,none]
earliest:表示从最早的数据开始消费(从头消费)
latest【默认】:表示从最新的数据开始消费
none:如果根据指定的group.id没有找到之前消费的offset信息,就会抛异常
解释:【查找到了,但是查找到的那个offset对应的数据已经不存在了】
假设你第一天使用一个消费者去消费了一条数据,然后就把消费者停掉了,等了7天之后,你又使用这个消费者去消费数据,这个时候,这个消费者启动的时候会到kafka里面查询它之前保存的offset信息,但是那个offset对应的数据已经被删了,所以此时再根据这个offset去消费是消费不到数据的。
总结,一般在实时计算的场景下,这个参数的值建议设置为latest,消费最新的数据。
这个参数只有在消费者第一次消费数据,或者之前保存的offset信息已过期的情况下才会生效
*/
prop.put("auto.offset.reset","latest");
//==================================================
我们来验证一下,上面的规则。接上小节的案例,首先启动一次生产者,再启动一次消费者,看看消费者能不能消费到这条数据,如果能消费到,就说明此时是根据上次保存的offset信息进行消费了。
生产者代码:
package com.example.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* 需求:Java代码实现生产者代码
* @author simoniu
*/
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
//指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
//指定topic
String topic = "mytopic1";
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//向topic中生产数据
producer.send(new ProducerRecord<String, String>(topic, "welcome to kafka world1!"));
//关闭链接
producer.close();
}
}
消费者代码:
package com.example.kafka.consumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* 需求:Java代码实现消费者代码
* @author simoniu
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "consumer_group-1");
prop.put("auto.offset.reset","latest");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("mytopic1");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
//消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
}
}
结果我们发现是可以消费到的。
ConsumerRecord(topic = mytopic1, partition = 4, leaderEpoch = 9, offset = 2, CreateTime = 1698764940920, serialized key size = -1, serialized value size = 24, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world1!)
将auto.offset.reset置为earliest,修改一下group.id的值,相当于使用一个新的消费者,验证一下,看是否能把这个mytopic1中的所有数据都取出来,因为新的消费者第一次肯定是获取不到offset信息的,所以就会根据auto.offset.reset的值来消费数据。
消费者代码:
package com.example.kafka.consumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* 需求:Java代码实现消费者代码
* @author simoniu
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "consumer_group-2"); //修改group.id
prop.put("auto.offset.reset","earliest");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("mytopic1");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
//消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
}
}
我们发现确实把之前的所有数据都消费出来了。
ConsumerRecord(topic = mytopic1, partition = 0, leaderEpoch = 3, offset = 0, CreateTime = 1698678820321, serialized key size = -1, serialized value size = 23, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world!)
ConsumerRecord(topic = mytopic1, partition = 1, leaderEpoch = 3, offset = 0, CreateTime = 1698681492499, serialized key size = -1, serialized value size = 23, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world!)
ConsumerRecord(topic = mytopic1, partition = 1, leaderEpoch = 6, offset = 1, CreateTime = 1698765292958, serialized key size = -1, serialized value size = 24, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world2!)
ConsumerRecord(topic = mytopic1, partition = 4, leaderEpoch = 1, offset = 0, CreateTime = 1698679083339, serialized key size = -1, serialized value size = 23, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world!)
ConsumerRecord(topic = mytopic1, partition = 4, leaderEpoch = 1, offset = 1, CreateTime = 1698679122899, serialized key size = -1, serialized value size = 23, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world!)
ConsumerRecord(topic = mytopic1, partition = 4, leaderEpoch = 9, offset = 2, CreateTime = 1698764940920, serialized key size = -1, serialized value size = 24, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world1!)
此时,关闭消费者(需要等待5秒,这样才会提交offset),再重新启动,发现没有消费到数据,说明此时就根据上次保存的offset来消费数据了,因为没有新数据产生,所以就消费不到了。