diff --git a/han-note-comment/han-note-comment-biz/pom.xml b/han-note-comment/han-note-comment-biz/pom.xml index a3cfd2a..8de0fc3 100644 --- a/han-note-comment/han-note-comment-biz/pom.xml +++ b/han-note-comment/han-note-comment-biz/pom.xml @@ -97,6 +97,12 @@ spring-boot-starter-aop + + + org.apache.rocketmq + rocketmq-client + + diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java new file mode 100644 index 0000000..81a895c --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java @@ -0,0 +1,94 @@ +package com.hanserwei.hannote.comment.biz.consumer; + +import com.google.common.util.concurrent.RateLimiter; +import com.hanserwei.hannote.comment.biz.constants.MQConstants; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +@SuppressWarnings("UnstableApiUsage") +@Component +@Slf4j +public class Comment2DBConsumer { + + @Value("${rocketmq.name-server}") + private String nameServer; + + private DefaultMQPushConsumer consumer; + + // 每秒创建 1000 个令牌 + private final RateLimiter rateLimiter = RateLimiter.create(1000); + + @Bean + public DefaultMQPushConsumer mqPushConsumer() throws MQClientException { + // Group组 + String group = "han_note_group_" + MQConstants.TOPIC_PUBLISH_COMMENT; + + // 创建一个新的DefaultMQPushConsumer示例并指定消费者的消费组名 + consumer = new DefaultMQPushConsumer(group); + + // 设置NameServer地址 + consumer.setNamesrvAddr(nameServer); + + // 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息) + consumer.subscribe(MQConstants.TOPIC_PUBLISH_COMMENT, "*"); + + // 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // 设置消息消费模式,这里使用集群模式 (CLUSTERING) + consumer.setMessageModel(MessageModel.CLUSTERING); + + // 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息 + consumer.setConsumeMessageBatchMaxSize(30); + + // 注册消息监听器 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + log.info("==> 本批次消息大小: {}", msgs.size()); + try { + // 令牌桶流控 + rateLimiter.acquire(); + + for (MessageExt msg : msgs) { + String message = new String(msg.getBody()); + log.info("==> Consumer - Received message: {}", message); + + // TODO: 业务处理 + } + + // 手动 ACK,告诉 RocketMQ 这批次消息消费成功 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } catch (Exception e) { + log.error("", e); + // 手动 ACK,告诉 RocketMQ 这批次消息处理失败,稍后再进行重试 + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + }); + + // 启动消费者 + consumer.start(); + return consumer; + } + + @PreDestroy + public void destroy() { + if (Objects.nonNull(consumer)) { + try { + consumer.shutdown(); // 关闭消费者 + } catch (Exception e) { + log.error("", e); + } + } + } +} diff --git a/han-note-comment/han-note-comment-biz/src/test/java/com/hanserwei/hannote/comment/biz/MQTests.java b/han-note-comment/han-note-comment-biz/src/test/java/com/hanserwei/hannote/comment/biz/MQTests.java new file mode 100644 index 0000000..d148cef --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/test/java/com/hanserwei/hannote/comment/biz/MQTests.java @@ -0,0 +1,46 @@ +package com.hanserwei.hannote.comment.biz; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +@SpringBootTest +@Slf4j +class MQTests { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + /** + * 测试:模拟发送评论发布消息 + */ + @Test + void testBatchSendMQ() { + for (long i = 0; i < 1620; i++) { + + // 构建消息对象 + Message message = MessageBuilder.withPayload("消息体数据") + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend("PublishCommentTopic", message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【评论发布】MQ 发送异常: ", throwable); + } + }); + } + } + +} \ No newline at end of file