Compare commits
2 Commits
226c28885b
...
c37b16ff42
| Author | SHA1 | Date | |
|---|---|---|---|
| c37b16ff42 | |||
| eb19d52fcb |
@@ -85,6 +85,25 @@
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Spring Retry 重试框架 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.retry</groupId>
|
||||
<artifactId>spring-retry</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- AOP 切面(Spring Retry 重试框架需要) -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-aop</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Rocket MQ 客户端 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -3,9 +3,11 @@ package com.hanserwei.hannote.comment.biz;
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.retry.annotation.EnableRetry;
|
||||
|
||||
@SpringBootApplication
|
||||
@MapperScan("com.hanserwei.hannote.comment.biz.domain.mapper")
|
||||
@EnableRetry
|
||||
public class HannoteCommentApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(HannoteCommentApplication.class, args);
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.hanserwei.hannote.comment.biz.config;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
@Configuration
|
||||
public class RetryConfig {
|
||||
|
||||
@Resource
|
||||
private RetryProperties retryProperties;
|
||||
|
||||
@Bean
|
||||
public RetryTemplate retryTemplate() {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
// 定义重试策略
|
||||
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
|
||||
retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts());// 最大重试次数
|
||||
|
||||
// 定义间隔策略
|
||||
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||
backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔时间
|
||||
backOffPolicy.setMultiplier(retryProperties.getMultiplier()); // 延迟倍数,默认为2
|
||||
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
|
||||
return retryTemplate;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.hanserwei.hannote.comment.biz.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ConfigurationProperties(prefix = RetryProperties.PREFIX)
|
||||
@Component
|
||||
@Data
|
||||
public class RetryProperties {
|
||||
|
||||
public static final String PREFIX = "retry";
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
private Integer maxAttempts = 3;
|
||||
|
||||
/**
|
||||
* 初始间隔时间,单位 ms
|
||||
*/
|
||||
private Integer initInterval = 1000;
|
||||
|
||||
/**
|
||||
* 乘积(每次乘以 2)
|
||||
*/
|
||||
private Double multiplier = 2.0;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.hanserwei.hannote.comment.biz.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@Configuration
|
||||
public class ThreadPoolConfig {
|
||||
|
||||
@Bean(name = "taskExecutor")
|
||||
public ThreadPoolTaskExecutor taskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
// 核心线程数
|
||||
executor.setCorePoolSize(10);
|
||||
// 最大线程数
|
||||
executor.setMaxPoolSize(50);
|
||||
// 队列容量
|
||||
executor.setQueueCapacity(200);
|
||||
// 线程活跃时间(秒)
|
||||
executor.setKeepAliveSeconds(30);
|
||||
// 线程名前缀
|
||||
executor.setThreadNamePrefix("NoteExecutor-");
|
||||
|
||||
// 拒绝策略:由调用线程处理(一般为主线程)
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
|
||||
// 等待所有任务结束后再关闭线程池
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
// 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
|
||||
executor.setAwaitTerminationSeconds(60);
|
||||
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.hanserwei.hannote.comment.biz.consumer;
|
||||
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
@Component
|
||||
@Slf4j
|
||||
public class Comment2DBConsumer {
|
||||
|
||||
@Value("${rocketmq.name-server}")
|
||||
private String nameServer;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
// 每秒创建 1000 个令牌
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(1000);
|
||||
|
||||
@Bean
|
||||
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||
// Group组
|
||||
String group = "han_note_group_" + MQConstants.TOPIC_PUBLISH_COMMENT;
|
||||
|
||||
// 创建一个新的DefaultMQPushConsumer示例并指定消费者的消费组名
|
||||
consumer = new DefaultMQPushConsumer(group);
|
||||
|
||||
// 设置NameServer地址
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
|
||||
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||
consumer.subscribe(MQConstants.TOPIC_PUBLISH_COMMENT, "*");
|
||||
|
||||
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||
|
||||
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||
|
||||
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息
|
||||
consumer.setConsumeMessageBatchMaxSize(30);
|
||||
|
||||
// 注册消息监听器
|
||||
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
|
||||
log.info("==> 本批次消息大小: {}", msgs.size());
|
||||
try {
|
||||
// 令牌桶流控
|
||||
rateLimiter.acquire();
|
||||
|
||||
for (MessageExt msg : msgs) {
|
||||
String message = new String(msg.getBody());
|
||||
log.info("==> Consumer - Received message: {}", message);
|
||||
|
||||
// TODO: 业务处理
|
||||
}
|
||||
|
||||
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
// 手动 ACK,告诉 RocketMQ 这批次消息处理失败,稍后再进行重试
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
}
|
||||
});
|
||||
|
||||
// 启动消费者
|
||||
consumer.start();
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (Objects.nonNull(consumer)) {
|
||||
try {
|
||||
consumer.shutdown(); // 关闭消费者
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package com.hanserwei.hannote.comment.biz.retry;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class SendMqRetryHelper {
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
@Resource
|
||||
private RetryTemplate retryTemplate;
|
||||
@Resource(name = "taskExecutor")
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
|
||||
/**
|
||||
* 异步发送 MQ
|
||||
*
|
||||
* @param topic MQ topic
|
||||
*/
|
||||
public void asyncSend(String topic, String body) {
|
||||
log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body);
|
||||
|
||||
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
|
||||
Message<String> message = MessageBuilder.withPayload(body)
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息,提升接口响应速度
|
||||
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
|
||||
handleRetry(topic, message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 重试处理
|
||||
*
|
||||
* @param topic MQ topic
|
||||
* @param message 消息对象
|
||||
*/
|
||||
private void handleRetry(String topic, Message<String> message) {
|
||||
// 异步处理
|
||||
threadPoolTaskExecutor.submit(() -> {
|
||||
try {
|
||||
// 通过 retryTemplate 执行重试
|
||||
retryTemplate.execute((RetryCallback<Void, RuntimeException>) context -> {
|
||||
log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now());
|
||||
// 同步发送 MQ
|
||||
rocketMQTemplate.syncSend(topic, message);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
// 多次重试失败,进入兜底方案
|
||||
fallback(e, topic, message.getPayload());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除
|
||||
*/
|
||||
private void fallback(Exception e, String topic, String bodyJson) {
|
||||
log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson);
|
||||
|
||||
// TODO:
|
||||
}
|
||||
}
|
||||
@@ -10,15 +10,11 @@ import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
|
||||
import com.hanserwei.hannote.comment.biz.model.vo.PublishCommentReqVO;
|
||||
import com.hanserwei.hannote.comment.biz.retry.SendMqRetryHelper;
|
||||
import com.hanserwei.hannote.comment.biz.service.CommentService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -28,7 +24,7 @@ import java.time.LocalDateTime;
|
||||
public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO> implements CommentService {
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
private SendMqRetryHelper sendMqRetryHelper;
|
||||
|
||||
@Override
|
||||
public Response<?> publishComment(PublishCommentReqVO publishCommentReqVO) {
|
||||
@@ -54,22 +50,8 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
|
||||
.createTime(LocalDateTime.now())
|
||||
.creatorId(creatorId)
|
||||
.build();
|
||||
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
|
||||
Message<String> message = MessageBuilder
|
||||
.withPayload(JsonUtils.toJsonString(publishCommentMqDTO))
|
||||
.build();
|
||||
// 异步发送 MQ 消息,提升接口响应速度
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
|
||||
}
|
||||
});
|
||||
// 发送 MQ 消息,包含重试机制
|
||||
sendMqRetryHelper.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, JsonUtils.toJsonString(publishCommentMqDTO));
|
||||
return Response.success();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,4 +14,8 @@ mybatis-plus:
|
||||
log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
|
||||
global-config:
|
||||
banner: false
|
||||
mapper-locations: classpath*:/mapperxml/*.xml
|
||||
mapper-locations: classpath*:/mapperxml/*.xml
|
||||
retry:
|
||||
max-attempts: 3 # 最大重试次数
|
||||
init-interval: 1000 # 初始延迟时间,单位 ms
|
||||
multiplier: 2 # 每次重试间隔加倍(每次乘以 2)
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.hanserwei.hannote.comment.biz;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
@SpringBootTest
|
||||
@Slf4j
|
||||
class MQTests {
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 测试:模拟发送评论发布消息
|
||||
*/
|
||||
@Test
|
||||
void testBatchSendMQ() {
|
||||
for (long i = 0; i < 1620; i++) {
|
||||
|
||||
// 构建消息对象
|
||||
Message<String> message = MessageBuilder.withPayload("消息体数据")
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend("PublishCommentTopic", message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -298,6 +298,6 @@ Authorization: Bearer {{token}}
|
||||
|
||||
{
|
||||
"noteId": 1862481582414102549,
|
||||
"content": "这是一个评论",
|
||||
"content": "这是第三个评论,测试一下异步消息重试",
|
||||
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg"
|
||||
}
|
||||
Reference in New Issue
Block a user