Springboot整合RocketMQ过程如下。
项目结构图如下:

1.添加依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<!-- maven打包时跳过测试 -->
<skipTests>true</skipTests>
<spring-boot.version>2.6.13</spring-boot.version>
<mysql.version>8.0.28</mysql.version>
<mybatis-plus.version>3.5.0</mybatis-plus.version>
<junit.versoin>4.12</junit.versoin>
<lombok.version>1.18.24</lombok.version>
<hutool.version>5.8.18</hutool.version>
<springboot.rocketmq>2.0.3</springboot.rocketmq>
<rocketmq.version>5.0.0</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
<version>${spring-boot.version}</version>
</dependency>
<!--mysql的驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>${spring-boot.version}</version>
</dependency>
<!--mybatis-plus的依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>${junit.versoin}</version>
</dependency>
<!--糊涂工具类依赖-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!--springboot2.x以后默认使用lettuce-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- rocketmq依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${springboot.rocketmq}</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
-->
</dependencies>
2.application.yml中添加配置
#yml格式配置文档范例
server:
#端口配置
port: 8888
#上下文
servlet:
context-path: /mqdemo
#数据源配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/springboot?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
#Springboot2.0 的hikari配置
hikari:
minimum-idle: 5
#空闲连接存活最大时间,默认600000(10分钟)
idle-timeout: 180000
#连接池最大连接数,默认是10
maximum-pool-size: 10
#此属性控制从池返回的连接的默认自动提交行为,默认值:true
auto-commit: true
#连接池名称
pool-name: MyHikariCP
#此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
max-lifetime: 1800000
#数据库连接超时时间,默认30秒,即30000
connection-timeout: 30000
connection-test-query: SELECT 1
jpa:
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
format_sql: true
enable_lazy_load_no_trans: true
hbm2ddl:
auto: update
# 以下属于mybatis-plus的基本配置,请粘贴复制
mybatis-plus:
global-config:
db-config:
logic-delete-field: flag # 全局逻辑删除的实体字段名(since 3.3.0,配置后可以忽略不配置步骤2)
logic-delete-value: 1 # 逻辑已删除值(默认为 1)
logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#在映射实体或者属性时,将数据库中表名和字段名中的下划线去掉,按照驼峰命名法映射 address_book ---> addressBook
map-underscore-to-camel-case: true
#以下是rocketmq的配置
rocketmq:
name-server: 127.0.0.1:9876 # 访问地址
producer:
group: Pro_Group # 必须指定group
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
3.编写消息实体类
import java.io.Serializable;
@Data
@Builder
public class Users implements Serializable {
private Integer uid;
private String username;
private String password;
}
4.编写生产者
package com.simoniu.springbootrocketmqdemo.service;
import com.alibaba.fastjson.JSON;
import com.simoniu.springbootrocketmqdemo.entity.Users;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
// 建议正常规模项目统一用一个TOPIC
private static final String topic = "RLT_TEST_TOPIC";
// 直接注入使用,用于发送消息到broker服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
*/
public void send(Users user) {
rocketMQTemplate.convertAndSend(topic + ":tag1", user);
//rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行
}
/**
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* (msgBody也可以是对象,sendResult为返回的发送结果)
*/
public SendResult sendMsg(String msgBody) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
System.out.println("【sendMsg】sendResult="+ JSON.toJSONString(sendResult));
return sendResult;
}
/**
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
*/
public void sendAsyncMsg(String msgBody) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}
@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
}
});
}
/**
* 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String msgBody, int delayLevel) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}
/**
* 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
*/
public void sendOneWayMsg(String msgBody) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
public SendResult sendTagMsg(String msgBody) {
return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());
}
}
5.编写消费者
package com.simoniu.springbootrocketmqdemo.service;
import com.alibaba.fastjson.JSON;
import com.simoniu.springbootrocketmqdemo.entity.Users;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Component
public class MQConsumerService {
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
// selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")
public class ConsumerSend implements RocketMQListener<Users> {
// 监听到消息就会执行此方法
@Override
public void onMessage(Users user) {
System.out.println("监听到消息:users="+ JSON.toJSONString(user));
}
}
// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
// 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
public class ConsumerSend2 implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
System.out.println("监听到消息:str="+ str);
}
}
// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
System.out.println("监听到消息:msg="+ msg);
}
}
}
6.编写测试接口
package com.simoniu.springbootrocketmqdemo.controller;
import com.simoniu.springbootrocketmqdemo.entity.Users;
import com.simoniu.springbootrocketmqdemo.json.R;
import com.simoniu.springbootrocketmqdemo.service.MQProducerService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private MQProducerService mqProducerService;
@GetMapping("/send")
public void send() {
Users user = Users.builder().uid(100).username("admin").password("123456").build();
mqProducerService.send(user);
}
@GetMapping("/sendTag")
public R sendTag() {
SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
return R.success("发送成功!",sendResult);
}
}
7.postman测试接口
用postman调用测试: 1). http://localhost:8888/mqdemo/rocketmq/send
监听到消息:str={"uid":100,"username":"admin","password":"123456"}
监听到消息:users={"password":"123456","uid":100,"username":"admin"}
2).http://localhost:8888/mqdemo/rocketmq/sendTag

总结: