diff --git a/han-note-count/han-note-count-biz/pom.xml b/han-note-count/han-note-count-biz/pom.xml index ecc3570..cf6a488 100644 --- a/han-note-count/han-note-count-biz/pom.xml +++ b/han-note-count/han-note-count-biz/pom.xml @@ -97,6 +97,13 @@ rocketmq-spring-boot-starter + + + com.github.phantomthief + buffer-trigger + + + diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFansConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFansConsumer.java index 2e1e149..fbcbaa8 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFansConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFansConsumer.java @@ -1,11 +1,16 @@ package com.hanserwei.hannote.count.biz.consumer; +import com.github.phantomthief.collection.BufferTrigger; +import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.count.biz.constant.MQConstants; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; +import java.time.Duration; +import java.util.List; + @Component @RocketMQMessageListener( consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FANS, @@ -13,8 +18,20 @@ import org.springframework.stereotype.Component; ) @Slf4j public class CountFansConsumer implements RocketMQListener { + private final BufferTrigger bufferTrigger = BufferTrigger.batchBlocking() + .bufferSize(50000) // 缓存队列的最大容量 + .batchSize(1000) // 一批次最多聚合 1000 条 + .linger(Duration.ofSeconds(1)) // 多久聚合一次 + .setConsumerEx(this::consumeMessage) + .build(); @Override public void onMessage(String body) { - log.info("## 消费了 MQ [计数:粉丝数]: {}", body); + // 往 bufferTrigger 中添加元素 + bufferTrigger.enqueue(body); + } + + private void consumeMessage(List bodys) { + log.info("==> 聚合消息, size: {}", bodys.size()); + log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys)); } } diff --git a/han-note-user-relation/han-note-user-relation-biz/src/test/java/com/hanserwei/hannote/user/relation/biz/MQTests.java b/han-note-user-relation/han-note-user-relation-biz/src/test/java/com/hanserwei/hannote/user/relation/biz/MQTests.java index 364414d..8d0e3e7 100644 --- a/han-note-user-relation/han-note-user-relation-biz/src/test/java/com/hanserwei/hannote/user/relation/biz/MQTests.java +++ b/han-note-user-relation/han-note-user-relation-biz/src/test/java/com/hanserwei/hannote/user/relation/biz/MQTests.java @@ -2,6 +2,8 @@ package com.hanserwei.hannote.user.relation.biz; import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.user.relation.biz.constant.MQConstants; +import com.hanserwei.hannote.user.relation.biz.enums.FollowUnfollowTypeEnum; +import com.hanserwei.hannote.user.relation.biz.model.dto.CountFollowUnfollowMqDTO; import com.hanserwei.hannote.user.relation.biz.model.dto.FollowUserMqDTO; import com.hanserwei.hannote.user.relation.biz.model.dto.UnfollowUserMqDTO; import jakarta.annotation.Resource; @@ -125,4 +127,37 @@ class MQTests { } } + /** + * 测试:发送计数 MQ, 以统计粉丝数 + */ + @Test + void testSendCountFollowUnfollowMQ() { + // 循环发送 3200 条 MQ + for (long i = 0; i < 3200; i++) { + // 构建消息体 DTO + CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = CountFollowUnfollowMqDTO.builder() + .userId(i + 1) // 关注者用户 ID + .targetUserId(27L) // 目标用户 + .type(FollowUnfollowTypeEnum.FOLLOW.getCode()) + .build(); + + // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 + org.springframework.messaging.Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(countFollowUnfollowMqDTO)) + .build(); + + // 发送 MQ 通知计数服务:统计粉丝数 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数服务:粉丝数】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数服务:粉丝数】MQ 发送异常: ", throwable); + } + }); + } + + } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 47f3af7..98df385 100755 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,7 @@ 3.9.4 2.3.4 5.3.2 + 0.2.21 @@ -285,6 +286,12 @@ rocketmq-acl ${rocketmq-client.version} + + + com.github.phantomthief + buffer-trigger + ${buffertrigger.version} +