feat(count): 引入批量消费机制优化粉丝计数处理
- 添加 BufferTrigger依赖以支持消息聚合 - 实现消息批量消费逻辑,提升处理效率 - 配置批量大小为 1000,缓存队列最大容量为50000 - 设置聚合间隔为1 秒,均衡实时性与性能 - 新增测试用例验证大量消息发送与消费流程 - 日志记录聚合消息内容,便于调试与监控
This commit is contained in:
@@ -97,6 +97,13 @@
|
|||||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- 快手 Buffer Trigger -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.phantomthief</groupId>
|
||||||
|
<artifactId>buffer-trigger</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -1,11 +1,16 @@
|
|||||||
package com.hanserwei.hannote.count.biz.consumer;
|
package com.hanserwei.hannote.count.biz.consumer;
|
||||||
|
|
||||||
|
import com.github.phantomthief.collection.BufferTrigger;
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FANS,
|
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FANS,
|
||||||
@@ -13,8 +18,20 @@ import org.springframework.stereotype.Component;
|
|||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class CountFansConsumer implements RocketMQListener<String> {
|
public class CountFansConsumer implements RocketMQListener<String> {
|
||||||
|
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
|
||||||
|
.bufferSize(50000) // 缓存队列的最大容量
|
||||||
|
.batchSize(1000) // 一批次最多聚合 1000 条
|
||||||
|
.linger(Duration.ofSeconds(1)) // 多久聚合一次
|
||||||
|
.setConsumerEx(this::consumeMessage)
|
||||||
|
.build();
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(String body) {
|
public void onMessage(String body) {
|
||||||
log.info("## 消费了 MQ [计数:粉丝数]: {}", body);
|
// 往 bufferTrigger 中添加元素
|
||||||
|
bufferTrigger.enqueue(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void consumeMessage(List<String> bodys) {
|
||||||
|
log.info("==> 聚合消息, size: {}", bodys.size());
|
||||||
|
log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package com.hanserwei.hannote.user.relation.biz;
|
|||||||
|
|
||||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
import com.hanserwei.hannote.user.relation.biz.constant.MQConstants;
|
import com.hanserwei.hannote.user.relation.biz.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.user.relation.biz.enums.FollowUnfollowTypeEnum;
|
||||||
|
import com.hanserwei.hannote.user.relation.biz.model.dto.CountFollowUnfollowMqDTO;
|
||||||
import com.hanserwei.hannote.user.relation.biz.model.dto.FollowUserMqDTO;
|
import com.hanserwei.hannote.user.relation.biz.model.dto.FollowUserMqDTO;
|
||||||
import com.hanserwei.hannote.user.relation.biz.model.dto.UnfollowUserMqDTO;
|
import com.hanserwei.hannote.user.relation.biz.model.dto.UnfollowUserMqDTO;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
@@ -125,4 +127,37 @@ class MQTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 测试:发送计数 MQ, 以统计粉丝数
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
void testSendCountFollowUnfollowMQ() {
|
||||||
|
// 循环发送 3200 条 MQ
|
||||||
|
for (long i = 0; i < 3200; i++) {
|
||||||
|
// 构建消息体 DTO
|
||||||
|
CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = CountFollowUnfollowMqDTO.builder()
|
||||||
|
.userId(i + 1) // 关注者用户 ID
|
||||||
|
.targetUserId(27L) // 目标用户
|
||||||
|
.type(FollowUnfollowTypeEnum.FOLLOW.getCode())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
|
||||||
|
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countFollowUnfollowMqDTO))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 发送 MQ 通知计数服务:统计粉丝数
|
||||||
|
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS, message, new SendCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult sendResult) {
|
||||||
|
log.info("==> 【计数服务:粉丝数】MQ 发送成功,SendResult: {}", sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Throwable throwable) {
|
||||||
|
log.error("==> 【计数服务:粉丝数】MQ 发送异常: ", throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
7
pom.xml
7
pom.xml
@@ -64,6 +64,7 @@
|
|||||||
<zookeeper.version>3.9.4</zookeeper.version>
|
<zookeeper.version>3.9.4</zookeeper.version>
|
||||||
<rocketmq-spring-boot.version>2.3.4</rocketmq-spring-boot.version>
|
<rocketmq-spring-boot.version>2.3.4</rocketmq-spring-boot.version>
|
||||||
<rocketmq-client.version>5.3.2</rocketmq-client.version>
|
<rocketmq-client.version>5.3.2</rocketmq-client.version>
|
||||||
|
<buffertrigger.version>0.2.21</buffertrigger.version>
|
||||||
</properties>
|
</properties>
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
@@ -285,6 +286,12 @@
|
|||||||
<artifactId>rocketmq-acl</artifactId>
|
<artifactId>rocketmq-acl</artifactId>
|
||||||
<version>${rocketmq-client.version}</version>
|
<version>${rocketmq-client.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<!-- 快手 Buffer Trigger -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.phantomthief</groupId>
|
||||||
|
<artifactId>buffer-trigger</artifactId>
|
||||||
|
<version>${buffertrigger.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|||||||
Reference in New Issue
Block a user