feat(mq): 实现评论发布消息消费功能
- 新增 Comment2DBConsumer 消费者类,用于处理评论发布消息- 配置 RocketMQ 消费者,订阅 PublishCommentTopic 主题 - 实现消息监听逻辑,支持批量消费和手动确认机制 - 添加令牌桶限流控制,限制每秒处理消息数量 - 集成 Spring Boot 生命周期管理,确保消费者优雅关闭 - 新增 MQ 测试类 MQTests,验证消息发送与消费流程 - 引入 rocketmq-client 依赖以支持 RocketMQ 功能
This commit is contained in:
@@ -97,6 +97,12 @@
|
|||||||
<artifactId>spring-boot-starter-aop</artifactId>
|
<artifactId>spring-boot-starter-aop</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Rocket MQ 客户端 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,94 @@
|
|||||||
|
package com.hanserwei.hannote.comment.biz.consumer;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
|
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@SuppressWarnings("UnstableApiUsage")
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class Comment2DBConsumer {
|
||||||
|
|
||||||
|
@Value("${rocketmq.name-server}")
|
||||||
|
private String nameServer;
|
||||||
|
|
||||||
|
private DefaultMQPushConsumer consumer;
|
||||||
|
|
||||||
|
// 每秒创建 1000 个令牌
|
||||||
|
private final RateLimiter rateLimiter = RateLimiter.create(1000);
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||||
|
// Group组
|
||||||
|
String group = "han_note_group_" + MQConstants.TOPIC_PUBLISH_COMMENT;
|
||||||
|
|
||||||
|
// 创建一个新的DefaultMQPushConsumer示例并指定消费者的消费组名
|
||||||
|
consumer = new DefaultMQPushConsumer(group);
|
||||||
|
|
||||||
|
// 设置NameServer地址
|
||||||
|
consumer.setNamesrvAddr(nameServer);
|
||||||
|
|
||||||
|
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||||
|
consumer.subscribe(MQConstants.TOPIC_PUBLISH_COMMENT, "*");
|
||||||
|
|
||||||
|
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||||
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||||
|
|
||||||
|
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||||
|
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||||
|
|
||||||
|
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息
|
||||||
|
consumer.setConsumeMessageBatchMaxSize(30);
|
||||||
|
|
||||||
|
// 注册消息监听器
|
||||||
|
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
|
||||||
|
log.info("==> 本批次消息大小: {}", msgs.size());
|
||||||
|
try {
|
||||||
|
// 令牌桶流控
|
||||||
|
rateLimiter.acquire();
|
||||||
|
|
||||||
|
for (MessageExt msg : msgs) {
|
||||||
|
String message = new String(msg.getBody());
|
||||||
|
log.info("==> Consumer - Received message: {}", message);
|
||||||
|
|
||||||
|
// TODO: 业务处理
|
||||||
|
}
|
||||||
|
|
||||||
|
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("", e);
|
||||||
|
// 手动 ACK,告诉 RocketMQ 这批次消息处理失败,稍后再进行重试
|
||||||
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 启动消费者
|
||||||
|
consumer.start();
|
||||||
|
return consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
if (Objects.nonNull(consumer)) {
|
||||||
|
try {
|
||||||
|
consumer.shutdown(); // 关闭消费者
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,46 @@
|
|||||||
|
package com.hanserwei.hannote.comment.biz;
|
||||||
|
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.producer.SendCallback;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
|
|
||||||
|
@SpringBootTest
|
||||||
|
@Slf4j
|
||||||
|
class MQTests {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RocketMQTemplate rocketMQTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 测试:模拟发送评论发布消息
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
void testBatchSendMQ() {
|
||||||
|
for (long i = 0; i < 1620; i++) {
|
||||||
|
|
||||||
|
// 构建消息对象
|
||||||
|
Message<String> message = MessageBuilder.withPayload("消息体数据")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 异步发送 MQ 消息
|
||||||
|
rocketMQTemplate.asyncSend("PublishCommentTopic", message, new SendCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult sendResult) {
|
||||||
|
log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Throwable throwable) {
|
||||||
|
log.error("==> 【评论发布】MQ 发送异常: ", throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user