← 返回首页
Springboot整合RocketMQ
发表时间:2023-08-16 15:02:51
Springboot整合RocketMQ

Springboot整合RocketMQ过程如下。

项目结构图如下:

1.添加依赖

  <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <!-- maven打包时跳过测试 -->
        <skipTests>true</skipTests>
        <spring-boot.version>2.6.13</spring-boot.version>
        <mysql.version>8.0.28</mysql.version>
        <mybatis-plus.version>3.5.0</mybatis-plus.version>
        <junit.versoin>4.12</junit.versoin>
        <lombok.version>1.18.24</lombok.version>
        <hutool.version>5.8.18</hutool.version>
        <springboot.rocketmq>2.0.3</springboot.rocketmq>
        <rocketmq.version>5.0.0</rocketmq.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
            <version>${spring-boot.version}</version>
        </dependency>

        <!--mysql的驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
            <version>${lombok.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <version>${spring-boot.version}</version>
        </dependency>

        <!--mybatis-plus的依赖-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
            <version>${junit.versoin}</version>
        </dependency>
        <!--糊涂工具类依赖-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool.version}</version>
        </dependency>

        <!--springboot2.x以后默认使用lettuce-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>

        <!-- rocketmq依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${springboot.rocketmq}</version>
        </dependency>

        <!--
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        -->
    </dependencies>

2.application.yml中添加配置

#yml格式配置文档范例
server:
  #端口配置
  port: 8888
  #上下文
  servlet:
    context-path: /mqdemo
#数据源配置
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/springboot?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    #Springboot2.0 的hikari配置
    hikari:
      minimum-idle: 5
      #空闲连接存活最大时间,默认600000(10分钟)
      idle-timeout: 180000
      #连接池最大连接数,默认是10
      maximum-pool-size: 10
      #此属性控制从池返回的连接的默认自动提交行为,默认值:true
      auto-commit: true
      #连接池名称
      pool-name: MyHikariCP
      #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
      max-lifetime: 1800000
      #数据库连接超时时间,默认30秒,即30000
      connection-timeout: 30000
      connection-test-query: SELECT 1
  jpa:
    show-sql: true
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL5InnoDBDialect
        format_sql: true
        enable_lazy_load_no_trans: true
        hbm2ddl:
          auto: update

# 以下属于mybatis-plus的基本配置,请粘贴复制
mybatis-plus:
  global-config:
    db-config:
      logic-delete-field: flag # 全局逻辑删除的实体字段名(since 3.3.0,配置后可以忽略不配置步骤2)
      logic-delete-value: 1 # 逻辑已删除值(默认为 1)
      logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    #在映射实体或者属性时,将数据库中表名和字段名中的下划线去掉,按照驼峰命名法映射 address_book ---> addressBook
    map-underscore-to-camel-case: true

#以下是rocketmq的配置
rocketmq:
  name-server: 127.0.0.1:9876 # 访问地址
  producer:
    group: Pro_Group # 必须指定group
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.编写消息实体类

import java.io.Serializable;

@Data
@Builder
public class Users implements Serializable {
    private Integer uid;
    private String username;
    private String password;
}

4.编写生产者

package com.simoniu.springbootrocketmqdemo.service;

import com.alibaba.fastjson.JSON;
import com.simoniu.springbootrocketmqdemo.entity.Users;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class MQProducerService {

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;

    // 建议正常规模项目统一用一个TOPIC
    private static final String topic = "RLT_TEST_TOPIC";

    // 直接注入使用,用于发送消息到broker服务器
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
     */
    public void send(Users user) {
        rocketMQTemplate.convertAndSend(topic + ":tag1", user);
       //rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行
    }

    /**
     * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
     * (msgBody也可以是对象,sendResult为返回的发送结果)
     */
    public SendResult sendMsg(String msgBody) {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
        System.out.println("【sendMsg】sendResult="+ JSON.toJSONString(sendResult));
        return sendResult;
    }

    /**
     * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
     * (适合对响应时间敏感的业务场景)
     */
    public void sendAsyncMsg(String msgBody) {
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 处理消息发送成功逻辑
            }
            @Override
            public void onException(Throwable throwable) {
                // 处理消息发送异常逻辑
            }
        });
    }

    /**
     * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
     * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public void sendDelayMsg(String msgBody, int delayLevel) {
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
    }

    /**
     * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
     */
    public void sendOneWayMsg(String msgBody) {
        rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
    }

    /**
     * 发送带tag的消息,直接在topic后面加上":tag"
     */
    public SendResult sendTagMsg(String msgBody) {
        return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());
    }

}

5.编写消费者

package com.simoniu.springbootrocketmqdemo.service;


import com.alibaba.fastjson.JSON;
import com.simoniu.springbootrocketmqdemo.entity.Users;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
public class MQConsumerService {

    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
    // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")
    public class ConsumerSend implements RocketMQListener<Users> {
        // 监听到消息就会执行此方法
        @Override
        public void onMessage(Users user) {
            System.out.println("监听到消息:users="+ JSON.toJSONString(user));
        }
    }

    // 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
    // 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
    public class ConsumerSend2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String str) {
            System.out.println("监听到消息:str="+ str);
        }
    }

    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
    public class Consumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            System.out.println("监听到消息:msg="+ msg);
        }
    }

}

6.编写测试接口

package com.simoniu.springbootrocketmqdemo.controller;

import com.simoniu.springbootrocketmqdemo.entity.Users;
import com.simoniu.springbootrocketmqdemo.json.R;
import com.simoniu.springbootrocketmqdemo.service.MQProducerService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {

    @Autowired
    private MQProducerService mqProducerService;

    @GetMapping("/send")
    public void send() {
        Users user = Users.builder().uid(100).username("admin").password("123456").build();
        mqProducerService.send(user);
    }

    @GetMapping("/sendTag")
    public R sendTag() {
        SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
        return R.success("发送成功!",sendResult);
    }

}

7.postman测试接口

用postman调用测试: 1). http://localhost:8888/mqdemo/rocketmq/send

监听到消息:str={"uid":100,"username":"admin","password":"123456"}
监听到消息:users={"password":"123456","uid":100,"username":"admin"}

2).http://localhost:8888/mqdemo/rocketmq/sendTag

总结: