feat(comment): 实现评论热度同步到 Redis ZSet

- 新增 Lua 脚本支持热点评论添加与更新
- 在评论消费端同步一级评论至 Redis 热点评论 ZSet
- 支持批量更新评论热度并维护 Redis 中的 Top 500 热点评论
- 修改 CommentDO 和 CommentHeatBO 模型,增加 noteId 字段以支持按笔记分组
- 调整 Mapper XML 查询字段,补充 note_id 字段用于构建 Redis Key
- 优化 Redis 脚本执行逻辑,确保线程安全及数据一致性
- 更新 HTTP 测试用例内容,验证 Redis 同步功能正确性
This commit is contained in:
2025-11-08 15:48:18 +08:00
parent 85e6bab079
commit bd775b805c
7 changed files with 175 additions and 7 deletions

View File

@@ -6,6 +6,7 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.comment.biz.constants.MQConstants; import com.hanserwei.hannote.comment.biz.constants.MQConstants;
import com.hanserwei.hannote.comment.biz.constants.RedisKeyConstants;
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO; import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper; import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum; import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
@@ -29,14 +30,15 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import java.util.List; import java.util.*;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@SuppressWarnings("UnstableApiUsage") @SuppressWarnings("UnstableApiUsage")
@@ -55,6 +57,8 @@ public class Comment2DBConsumer {
private KeyValueRpcService keyValueRpcService; private KeyValueRpcService keyValueRpcService;
@Resource @Resource
private RocketMQTemplate rocketMQTemplate; private RocketMQTemplate rocketMQTemplate;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private DefaultMQPushConsumer consumer; private DefaultMQPushConsumer consumer;
@@ -213,6 +217,9 @@ public class Comment2DBConsumer {
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countPublishCommentMqDTOS)) org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countPublishCommentMqDTOS))
.build(); .build();
// 同步一级评论到 Redis 热点评论 ZSET 中
syncOneLevelComment2RedisZSet(commentBOS);
// 异步发送 MQ 消息 // 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COMMENT, message, new SendCallback() { rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COMMENT, message, new SendCallback() {
@Override @Override
@@ -241,6 +248,40 @@ public class Comment2DBConsumer {
return consumer; return consumer;
} }
/**
* 同步一级评论到 Redis 热点评论 ZSET 中
*
* @param commentBOS 评论 BO 列表
*/
private void syncOneLevelComment2RedisZSet(List<CommentBO> commentBOS) {
// 过滤出一级评论,并按所属笔记进行分组,转换为一个 Map 字典
Map<Long, List<CommentBO>> commentIdAndBOListMap = commentBOS.stream()
.filter(commentBO -> Objects.equals(commentBO.getLevel(), CommentLevelEnum.ONE.getCode())) // 仅过滤一级评论
.collect(Collectors.groupingBy(CommentBO::getNoteId));
// 循环字典
commentIdAndBOListMap.forEach((noteId, commentBOList) -> {
// 构建 Redis 热点评论 ZSET Key
String key = RedisKeyConstants.buildCommentListKey(noteId);
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
// Lua 脚本路径
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/add_hot_comments.lua")));
// 返回值类型
script.setResultType(Long.class);
// 构建执行 Lua 脚本所需的 ARGS 参数
List<Object> args = Lists.newArrayList();
commentBOList.forEach(commentBO -> {
args.add(commentBO.getId()); // Member: 评论ID
args.add(0); // Score: 热度值,初始值为 0
});
// 执行 Lua 脚本
redisTemplate.execute(script, Collections.singletonList(key), args.toArray());
});
}
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
if (Objects.nonNull(consumer)) { if (Objects.nonNull(consumer)) {

View File

@@ -5,6 +5,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.comment.biz.constants.MQConstants; import com.hanserwei.hannote.comment.biz.constants.MQConstants;
import com.hanserwei.hannote.comment.biz.constants.RedisKeyConstants;
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO; import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper; import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
import com.hanserwei.hannote.comment.biz.model.bo.CommentHeatBO; import com.hanserwei.hannote.comment.biz.model.bo.CommentHeatBO;
@@ -13,12 +14,19 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
@Component @Component
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COMMENT_HEAT_UPDATE, // Group 组 @RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COMMENT_HEAT_UPDATE, // Group 组
@@ -29,6 +37,8 @@ public class CommentHeatUpdateConsumer implements RocketMQListener<String> {
@Resource @Resource
private CommentDOMapper commentDOMapper; private CommentDOMapper commentDOMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking() private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.bufferSize(50000) // 缓存队列的最大容量 .bufferSize(50000) // 缓存队列的最大容量
@@ -82,9 +92,47 @@ public class CommentHeatUpdateConsumer implements RocketMQListener<String> {
commentBOS.add(CommentHeatBO.builder() commentBOS.add(CommentHeatBO.builder()
.id(commentId) .id(commentId)
.heat(heatNum.doubleValue()) .heat(heatNum.doubleValue())
.noteId(commentDO.getNoteId())
.build()); .build());
}); });
// 批量更新评论热度值 // 批量更新评论热度值
commentDOMapper.batchUpdateHeatByCommentIds(ids, commentBOS); commentDOMapper.batchUpdateHeatByCommentIds(ids, commentBOS);
// 更新 Redis 中热度评论 ZSET
updateRedisHotComments(commentBOS);
}
/**
* 更新 Redis 中热点评论 ZSET
*
* @param commentHeatBOList 热度值 BO 列表
*/
private void updateRedisHotComments(List<CommentHeatBO> commentHeatBOList) {
// 过滤出热度值大于 0 的,并按所属笔记 ID 分组若热度等于0则不进行更新
Map<Long, List<CommentHeatBO>> noteIdAndBOListMap = commentHeatBOList.stream()
.filter(commentHeatBO -> commentHeatBO.getHeat() > 0)
.collect(Collectors.groupingBy(CommentHeatBO::getNoteId));
// 循环
noteIdAndBOListMap.forEach((noteId, commentHeatBOS) -> {
// 构建热点评论 Redis Key
String key = RedisKeyConstants.buildCommentListKey(noteId);
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
// Lua 脚本路径
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/update_hot_comments.lua")));
// 返回值类型
script.setResultType(Long.class);
// 构建执行 Lua 脚本所需的 ARGS 参数
List<Object> args = Lists.newArrayList();
commentHeatBOS.forEach(commentHeatBO -> {
args.add(commentHeatBO.getId()); // Member: 评论ID
args.add(commentHeatBO.getHeat()); // Score: 热度值
});
// 执行 Lua 脚本
redisTemplate.execute(script, Collections.singletonList(key), args.toArray());
});
} }
} }

View File

@@ -19,4 +19,9 @@ public class CommentHeatBO {
* 热度值 * 热度值
*/ */
private Double heat; private Double heat;
/**
* 笔记 ID
*/
private Long noteId;
} }

View File

@@ -0,0 +1,33 @@
-- 操作的 Key
local zsetKey = KEYS[1]
-- 获取传入的成员和分数列表
local membersScores = ARGV
-- ZSet 最多缓存 500 条评论
local sizeLimit = 500
-- 检查 ZSet 是否存在
if redis.call('EXISTS', zsetKey) == 0 then
return -1 -- 若不存在,直接 return
end
-- 获取当前 ZSet 的大小
local currentSize = redis.call('ZCARD', zsetKey)
-- 遍历传入的成员和分数,添加到 ZSet 中
for i = 1, #membersScores, 2 do
-- 评论 ID
local member = membersScores[i]
-- 热度值
local score = membersScores[i + 1]
-- 检查当前 ZSet 的大小是否小于 500 条
if currentSize < sizeLimit then
-- 若是,则添加缓存
redis.call('ZADD', zsetKey, score, member)
currentSize = currentSize + 1 -- 更新 ZSet 大小
else
break -- 否则,则达到最大限制,停止添加
end
end
return 0

View File

@@ -0,0 +1,40 @@
-- 入参说明:
-- KEYS[1]: ZSet 的键
-- ARGV: 每个评论的数据,格式为 member1, score1, member2, score2 ...
local zsetKey = KEYS[1]
local maxSize = 500 -- 最多缓存 500 条热点评论
local batchSize = #ARGV / 2 -- 有多少条评论
-- 确认 ZSet 是否存在
if redis.call("EXISTS", zsetKey) == 0 then
return -1 -- 如果 ZSet 不存在,直接返回
end
for i = 1, batchSize do
local member = ARGV[(i - 1) * 2 + 1] -- 获取当前评论 ID
local score = ARGV[(i - 1) * 2 + 2] -- 获取当前评论的热度
-- 获取 ZSet 的大小
local currentSize = redis.call("ZCARD", zsetKey)
if currentSize < maxSize then
-- 如果 ZSet 的大小小于 maxSize直接添加
redis.call("ZADD", zsetKey, score, member)
else
-- 若已缓存 500 条热点评论
-- 获取当前 ZSet 中热度值最小的评论
local minEntry = redis.call("ZRANGE", zsetKey, 0, 0, "WITHSCORES")
-- 热度最小评论的值
local minScore = minEntry[2]
if score > minScore then
-- 如果当前评论的热度大于最小热度,替换掉最小的;否则无视
redis.call("ZREM", zsetKey, minEntry[1])
redis.call("ZADD", zsetKey, score, member)
end
end
end
return 0

View File

@@ -58,7 +58,8 @@
child_comment_total, child_comment_total,
level, level,
parent_id, parent_id,
heat heat,
note_id
from t_comment from t_comment
where id in where id in
<foreach collection="commentIds" open="(" separator="," close=")" item="commentId"> <foreach collection="commentIds" open="(" separator="," close=")" item="commentId">

View File

@@ -298,9 +298,9 @@ Authorization: Bearer {{token}}
{ {
"noteId": 1862481582414102549, "noteId": 1862481582414102549,
"content": "这是一条测试评论计数的二级评论666", "content": "这是一条测试同步Redis并更新热度的评论",
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg", "imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg",
"replyCommentId": 4002 "replyCommentId": 8001
} }
### 批量添加评论 ### 批量添加评论