feat(comment): 引入MQ消息重试机制

- 添加Spring Retry依赖及AOP支持
- 配置重试策略与指数退避机制
- 创建SendMqRetryHelper封装MQ发送与重试逻辑
- 替换原有RocketMQTemplate为带重试功能的实现
- 增加线程池配置以支持异步重试任务执行
- 更新测试用例中的评论内容用于验证重试机制
This commit is contained in:
2025-11-04 21:38:28 +08:00
parent 226c28885b
commit eb19d52fcb
9 changed files with 212 additions and 24 deletions

View File

@@ -85,6 +85,19 @@
<artifactId>rocketmq-spring-boot-starter</artifactId> <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency> </dependency>
<!-- Spring Retry 重试框架 -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<!-- AOP 切面Spring Retry 重试框架需要) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -3,9 +3,11 @@ package com.hanserwei.hannote.comment.biz;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;
@SpringBootApplication @SpringBootApplication
@MapperScan("com.hanserwei.hannote.comment.biz.domain.mapper") @MapperScan("com.hanserwei.hannote.comment.biz.domain.mapper")
@EnableRetry
public class HannoteCommentApplication { public class HannoteCommentApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(HannoteCommentApplication.class, args); SpringApplication.run(HannoteCommentApplication.class, args);

View File

@@ -0,0 +1,34 @@
package com.hanserwei.hannote.comment.biz.config;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
public class RetryConfig {
@Resource
private RetryProperties retryProperties;
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 定义重试策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts());// 最大重试次数
// 定义间隔策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔时间
backOffPolicy.setMultiplier(retryProperties.getMultiplier()); // 延迟倍数默认为2
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}

View File

@@ -0,0 +1,29 @@
package com.hanserwei.hannote.comment.biz.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = RetryProperties.PREFIX)
@Component
@Data
public class RetryProperties {
public static final String PREFIX = "retry";
/**
* 最大重试次数
*/
private Integer maxAttempts = 3;
/**
* 初始间隔时间,单位 ms
*/
private Integer initInterval = 1000;
/**
* 乘积(每次乘以 2
*/
private Double multiplier = 2.0;
}

View File

@@ -0,0 +1,37 @@
package com.hanserwei.hannote.comment.biz.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 最大线程数
executor.setMaxPoolSize(50);
// 队列容量
executor.setQueueCapacity(200);
// 线程活跃时间(秒)
executor.setKeepAliveSeconds(30);
// 线程名前缀
executor.setThreadNamePrefix("NoteExecutor-");
// 拒绝策略:由调用线程处理(一般为主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}

View File

@@ -0,0 +1,87 @@
package com.hanserwei.hannote.comment.biz.retry;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
@Slf4j
public class SendMqRetryHelper {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private RetryTemplate retryTemplate;
@Resource(name = "taskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
/**
* 异步发送 MQ
*
* @param topic MQ topic
*/
public void asyncSend(String topic, String body) {
log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body);
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
Message<String> message = MessageBuilder.withPayload(body)
.build();
// 异步发送 MQ 消息,提升接口响应速度
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【评论发布】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
handleRetry(topic, message);
}
});
}
/**
* 重试处理
*
* @param topic MQ topic
* @param message 消息对象
*/
private void handleRetry(String topic, Message<String> message) {
// 异步处理
threadPoolTaskExecutor.submit(() -> {
try {
// 通过 retryTemplate 执行重试
retryTemplate.execute((RetryCallback<Void, RuntimeException>) context -> {
log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now());
// 同步发送 MQ
rocketMQTemplate.syncSend(topic, message);
return null;
});
} catch (Exception e) {
// 多次重试失败,进入兜底方案
fallback(e, topic, message.getPayload());
}
});
}
/**
* 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除
*/
private void fallback(Exception e, String topic, String bodyJson) {
log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson);
// TODO:
}
}

View File

@@ -10,15 +10,11 @@ import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper; import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO; import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
import com.hanserwei.hannote.comment.biz.model.vo.PublishCommentReqVO; import com.hanserwei.hannote.comment.biz.model.vo.PublishCommentReqVO;
import com.hanserwei.hannote.comment.biz.retry.SendMqRetryHelper;
import com.hanserwei.hannote.comment.biz.service.CommentService; import com.hanserwei.hannote.comment.biz.service.CommentService;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@@ -28,7 +24,7 @@ import java.time.LocalDateTime;
public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO> implements CommentService { public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO> implements CommentService {
@Resource @Resource
private RocketMQTemplate rocketMQTemplate; private SendMqRetryHelper sendMqRetryHelper;
@Override @Override
public Response<?> publishComment(PublishCommentReqVO publishCommentReqVO) { public Response<?> publishComment(PublishCommentReqVO publishCommentReqVO) {
@@ -54,22 +50,8 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
.createTime(LocalDateTime.now()) .createTime(LocalDateTime.now())
.creatorId(creatorId) .creatorId(creatorId)
.build(); .build();
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 // 发送 MQ 消息,包含重试机制
Message<String> message = MessageBuilder sendMqRetryHelper.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, JsonUtils.toJsonString(publishCommentMqDTO));
.withPayload(JsonUtils.toJsonString(publishCommentMqDTO))
.build();
// 异步发送 MQ 消息,提升接口响应速度
rocketMQTemplate.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【评论发布】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
}
});
return Response.success(); return Response.success();
} }
} }

View File

@@ -14,4 +14,8 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
global-config: global-config:
banner: false banner: false
mapper-locations: classpath*:/mapperxml/*.xml mapper-locations: classpath*:/mapperxml/*.xml
retry:
max-attempts: 3 # 最大重试次数
init-interval: 1000 # 初始延迟时间,单位 ms
multiplier: 2 # 每次重试间隔加倍(每次乘以 2

View File

@@ -298,6 +298,6 @@ Authorization: Bearer {{token}}
{ {
"noteId": 1862481582414102549, "noteId": 1862481582414102549,
"content": "这是个评论", "content": "这是第三个评论,测试一下异步消息重试",
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg" "imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg"
} }