diff --git a/han-note-comment/han-note-comment-biz/pom.xml b/han-note-comment/han-note-comment-biz/pom.xml index ebec865..a3cfd2a 100644 --- a/han-note-comment/han-note-comment-biz/pom.xml +++ b/han-note-comment/han-note-comment-biz/pom.xml @@ -85,6 +85,19 @@ rocketmq-spring-boot-starter + + + org.springframework.retry + spring-retry + + + + + org.springframework.boot + spring-boot-starter-aop + + + diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/HannoteCommentApplication.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/HannoteCommentApplication.java index 87982ff..9e0938d 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/HannoteCommentApplication.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/HannoteCommentApplication.java @@ -3,9 +3,11 @@ package com.hanserwei.hannote.comment.biz; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.retry.annotation.EnableRetry; @SpringBootApplication @MapperScan("com.hanserwei.hannote.comment.biz.domain.mapper") +@EnableRetry public class HannoteCommentApplication { public static void main(String[] args) { SpringApplication.run(HannoteCommentApplication.class, args); diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/RetryConfig.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/RetryConfig.java new file mode 100644 index 0000000..7687da3 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/RetryConfig.java @@ -0,0 +1,34 @@ +package com.hanserwei.hannote.comment.biz.config; + +import jakarta.annotation.Resource; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +@Configuration +public class RetryConfig { + + @Resource + private RetryProperties retryProperties; + + @Bean + public RetryTemplate retryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + + // 定义重试策略 + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts());// 最大重试次数 + + // 定义间隔策略 + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔时间 + backOffPolicy.setMultiplier(retryProperties.getMultiplier()); // 延迟倍数,默认为2 + + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(backOffPolicy); + + return retryTemplate; + } +} diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/RetryProperties.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/RetryProperties.java new file mode 100644 index 0000000..0ece649 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/RetryProperties.java @@ -0,0 +1,29 @@ +package com.hanserwei.hannote.comment.biz.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@ConfigurationProperties(prefix = RetryProperties.PREFIX) +@Component +@Data +public class RetryProperties { + + public static final String PREFIX = "retry"; + + /** + * 最大重试次数 + */ + private Integer maxAttempts = 3; + + /** + * 初始间隔时间,单位 ms + */ + private Integer initInterval = 1000; + + /** + * 乘积(每次乘以 2) + */ + private Double multiplier = 2.0; + +} \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/ThreadPoolConfig.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/ThreadPoolConfig.java new file mode 100644 index 0000000..6e03393 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/config/ThreadPoolConfig.java @@ -0,0 +1,37 @@ +package com.hanserwei.hannote.comment.biz.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +public class ThreadPoolConfig { + + @Bean(name = "taskExecutor") + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(10); + // 最大线程数 + executor.setMaxPoolSize(50); + // 队列容量 + executor.setQueueCapacity(200); + // 线程活跃时间(秒) + executor.setKeepAliveSeconds(30); + // 线程名前缀 + executor.setThreadNamePrefix("NoteExecutor-"); + + // 拒绝策略:由调用线程处理(一般为主线程) + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 等待所有任务结束后再关闭线程池 + executor.setWaitForTasksToCompleteOnShutdown(true); + // 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞 + executor.setAwaitTerminationSeconds(60); + + executor.initialize(); + return executor; + } +} diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/retry/SendMqRetryHelper.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/retry/SendMqRetryHelper.java new file mode 100644 index 0000000..e1f7463 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/retry/SendMqRetryHelper.java @@ -0,0 +1,87 @@ +package com.hanserwei.hannote.comment.biz.retry; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +@Component +@Slf4j +public class SendMqRetryHelper { + + @Resource + private RocketMQTemplate rocketMQTemplate; + @Resource + private RetryTemplate retryTemplate; + @Resource(name = "taskExecutor") + private ThreadPoolTaskExecutor threadPoolTaskExecutor; + + /** + * 异步发送 MQ + * + * @param topic MQ topic + */ + public void asyncSend(String topic, String body) { + log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body); + + // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 + Message message = MessageBuilder.withPayload(body) + .build(); + + // 异步发送 MQ 消息,提升接口响应速度 + rocketMQTemplate.asyncSend(topic, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【评论发布】MQ 发送异常: ", throwable); + handleRetry(topic, message); + } + }); + } + + /** + * 重试处理 + * + * @param topic MQ topic + * @param message 消息对象 + */ + private void handleRetry(String topic, Message message) { + // 异步处理 + threadPoolTaskExecutor.submit(() -> { + try { + // 通过 retryTemplate 执行重试 + retryTemplate.execute((RetryCallback) context -> { + log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now()); + // 同步发送 MQ + rocketMQTemplate.syncSend(topic, message); + return null; + }); + } catch (Exception e) { + // 多次重试失败,进入兜底方案 + fallback(e, topic, message.getPayload()); + } + }); + } + + /** + * 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除 + */ + private void fallback(Exception e, String topic, String bodyJson) { + log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson); + + // TODO: + } +} diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java index 50f8de5..91eef53 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java @@ -10,15 +10,11 @@ import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO; import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper; import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO; import com.hanserwei.hannote.comment.biz.model.vo.PublishCommentReqVO; +import com.hanserwei.hannote.comment.biz.retry.SendMqRetryHelper; import com.hanserwei.hannote.comment.biz.service.CommentService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import java.time.LocalDateTime; @@ -28,7 +24,7 @@ import java.time.LocalDateTime; public class CommentServiceImpl extends ServiceImpl implements CommentService { @Resource - private RocketMQTemplate rocketMQTemplate; + private SendMqRetryHelper sendMqRetryHelper; @Override public Response publishComment(PublishCommentReqVO publishCommentReqVO) { @@ -54,22 +50,8 @@ public class CommentServiceImpl extends ServiceImpl .createTime(LocalDateTime.now()) .creatorId(creatorId) .build(); - // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 - Message message = MessageBuilder - .withPayload(JsonUtils.toJsonString(publishCommentMqDTO)) - .build(); - // 异步发送 MQ 消息,提升接口响应速度 - rocketMQTemplate.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult); - } - - @Override - public void onException(Throwable throwable) { - log.error("==> 【评论发布】MQ 发送异常: ", throwable); - } - }); + // 发送 MQ 消息,包含重试机制 + sendMqRetryHelper.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, JsonUtils.toJsonString(publishCommentMqDTO)); return Response.success(); } } diff --git a/han-note-comment/han-note-comment-biz/src/main/resources/application.yml b/han-note-comment/han-note-comment-biz/src/main/resources/application.yml index 3dc7d16..9d744ac 100644 --- a/han-note-comment/han-note-comment-biz/src/main/resources/application.yml +++ b/han-note-comment/han-note-comment-biz/src/main/resources/application.yml @@ -14,4 +14,8 @@ mybatis-plus: log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl global-config: banner: false - mapper-locations: classpath*:/mapperxml/*.xml \ No newline at end of file + mapper-locations: classpath*:/mapperxml/*.xml +retry: + max-attempts: 3 # 最大重试次数 + init-interval: 1000 # 初始延迟时间,单位 ms + multiplier: 2 # 每次重试间隔加倍(每次乘以 2) diff --git a/http-client/gateApi.http b/http-client/gateApi.http index 3c71c8a..1317ad0 100644 --- a/http-client/gateApi.http +++ b/http-client/gateApi.http @@ -298,6 +298,6 @@ Authorization: Bearer {{token}} { "noteId": 1862481582414102549, - "content": "这是一个评论", + "content": "这是第三个评论,测试一下异步消息重试", "imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg" } \ No newline at end of file