← 返回首页
Java实现生产者和消费者
发表时间:2023-10-30 14:38:51
Java实现生产者和消费者

Java实现生产者和消费者。

前面我们使用基于console的生产者和消费者对topic实现了数据的生产和消费,这个基于控制台的生产者和消费者主要用于测试。在实际工作中,我们有时候需要将生产者和消费者功能集成到我们已有的系统中,此时就需要写代码实现生产者和消费者的逻辑了。在这我们使用java代码来实现生产者和消费者的功能。

项目结构图如下:

pom.xml添加kafka-client依赖。

<properties>
    <java.version>1.8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.6.13</spring-boot.version>
    <kafka-client.version>2.4.1</kafka-client.version>
</properties>
...
<dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <optional>true</optional>
     </dependency>
     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
     </dependency>

     <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka-client.version}</version>
     </dependency>
</dependencies>

1.生产者实现

package com.example.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/**
 * 需求:Java代码实现生产者代码
 * @author simoniu
 */
public class ProducerDemo {

    public static void main(String[] args) {

        Properties prop = new Properties();
        //指定kafka的broker地址
        prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        //指定key-value数据的序列化格式
        prop.put("key.serializer", StringSerializer.class.getName());
        prop.put("value.serializer", StringSerializer.class.getName());

        //指定topic
        String topic = "mytopic1";

        //创建kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);

        //向topic中生产数据
        producer.send(new ProducerRecord<String, String>(topic, "welcome to kafka world!"));

        //关闭链接
        producer.close();

    }

}

2.消费者实现

package com.example.kafka.consumer;


import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
 * 需求:Java代码实现消费者代码
 * @author simoniu
 */
public class ConsumerDemo {

    public static void main(String[] args) {

        Properties prop = new Properties();
        //指定kafka的broker地址
        prop.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        //指定key-value的反序列化类型
        prop.put("key.deserializer", StringDeserializer.class.getName());
        prop.put("value.deserializer", StringDeserializer.class.getName());
        //指定消费者组
        prop.put("group.id", "consumer_group-1");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
        Collection<String> topics = new ArrayList<String>();
        topics.add("mytopic1");
        //订阅指定的topic
        consumer.subscribe(topics);

        while(true) {
            //消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> consumerRecord : poll) {
                System.out.println(consumerRecord);
            }
        }

    }
}

测试运行。

在master主机检查是否存在名字叫mytopic1的主题。

[root@master kafka]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic mytopic1
Topic: mytopic1 PartitionCount: 5       ReplicationFactor: 2    Configs: 
        Topic: mytopic1 Partition: 0    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: mytopic1 Partition: 1    Leader: 2       Replicas: 1,2   Isr: 2,1
        Topic: mytopic1 Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: mytopic1 Partition: 3    Leader: 2       Replicas: 0,2   Isr: 2,0
        Topic: mytopic1 Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1,0

首先启动Consumer,再启动Producer。在kafka的消费者端就可以看到消费出来的数据了。

ConsumerRecord(topic = mytopic1, partition = 1, leaderEpoch = 3, offset = 0, CreateTime = 1698681492499, serialized key size = -1, serialized value size = 23, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = welcome to kafka world!)