Java实现生产者和消费者。
前面我们使用基于console的生产者和消费者对topic实现了数据的生产和消费,这个基于控制台的生产者和消费者主要用于测试。在实际工作中,我们有时候需要将生产者和消费者功能集成到我们已有的系统中,此时就需要写代码实现生产者和消费者的逻辑了。在这我们使用java代码来实现生产者和消费者的功能。
项目结构图如下:

pom.xml添加kafka-client依赖。
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
<kafka-client.version>2.4.1</kafka-client.version>
</properties>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-client.version}</version>
</dependency>
</dependencies>
package com.example.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* 需求:Java代码实现生产者代码
* @author simoniu
*/
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
//指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
//指定topic
String topic = "mytopic1";
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//向topic中生产数据
producer.send(new ProducerRecord<String, String>(topic, "welcome to kafka world!"));
//关闭链接
producer.close();
}
}
package com.example.kafka.consumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* 需求:Java代码实现消费者代码
* @author simoniu
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
//指定kafka的broker地址
prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
//指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
//指定消费者组
prop.put("group.id", "consumer_group-1");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("mytopic1");
//订阅指定的topic
consumer.subscribe(topics);
while(true) {
//消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
}
}
}
测试运行。
在master主机检查是否存在名字叫mytopic1的主题。
[root@master kafka]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic mytopic1
Topic: mytopic1 PartitionCount: 5 ReplicationFactor: 2 Configs:
Topic: mytopic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0
Topic: mytopic1 Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: mytopic1 Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: mytopic1 Partition: 3 Leader: 2 Replicas: 0,2 Isr: 2,0
Topic: mytopic1 Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0
首先启动Consumer,再启动Producer。在kafka的消费者端就可以看到消费出来的数据了。
ConsumerRecord(topic = mytopic1, partition = 1, leaderEpoch = 3, offset = 0, CreateTime = 1698681492499, serialized key size = -1, serialized value size = 23, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world!)