feat(comment): 实现评论点赞与取消点赞功能,评论点赞、取消点赞批量写库
- 新增评论点赞布隆过滤器,提升点赞判断性能 - 实现评论点赞与取消点赞的批量操作消费者 - 添加评论点赞状态查询接口及异常处理 - 优化点赞操作合并逻辑,减少数据库访问频率 - 增加评论点赞相关 Lua 脚本支持过期时间设置 - 完善评论点赞 Mapper 层批量插入与删除方法 - 添加评论已点赞业务异常状态码 - 新增测试类用于验证评论点赞 MQ 消费逻辑 - 调整 MQ 消费者 Bean 名称避免冲突 - 更新 HTTP 测试文件中的评论 ID便于调试
This commit is contained in:
@@ -65,7 +65,7 @@ public class Comment2DBConsumer {
|
||||
// 每秒创建 1000 个令牌
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(1000);
|
||||
|
||||
@Bean
|
||||
@Bean(name = "Comment2DBConsumer")
|
||||
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||
// Group组
|
||||
String group = "han_note_group_" + MQConstants.TOPIC_PUBLISH_COMMENT;
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
package com.hanserwei.hannote.comment.biz.consumer;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentLikeDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.enums.LikeUnlikeCommentTypeEnum;
|
||||
import com.hanserwei.hannote.comment.biz.model.dto.LikeUnlikeCommentMqDTO;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
@Component
|
||||
@Slf4j
|
||||
public class LikeUnlikeComment2DBConsumer {
|
||||
|
||||
// 每秒创建 5000 个令牌
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||
@Value("${rocketmq.name-server}")
|
||||
private String nameServer;
|
||||
@Resource
|
||||
private CommentLikeDOMapper commentLikeDOMapper;
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
@Bean(name = "LikeUnlikeComment2DBConsumer")
|
||||
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||
// Group 组
|
||||
String group = "han_note_group_" + MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE;
|
||||
|
||||
// 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名
|
||||
consumer = new DefaultMQPushConsumer(group);
|
||||
|
||||
// 设置 RocketMQ 的 NameServer 地址
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
|
||||
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||
consumer.subscribe(MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE, "*");
|
||||
|
||||
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||
|
||||
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||
|
||||
// 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。
|
||||
consumer.setMaxReconsumeTimes(3);
|
||||
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。
|
||||
consumer.setConsumeMessageBatchMaxSize(30);
|
||||
|
||||
// 注册消息监听器
|
||||
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||
log.info("==> 【评论点赞、取消点赞】本批次消息大小: {}", msgs.size());
|
||||
try {
|
||||
// 令牌桶流控, 以控制数据库能够承受的 QPS
|
||||
rateLimiter.acquire();
|
||||
|
||||
// 将批次 Json 消息体转换 DTO 集合
|
||||
List<LikeUnlikeCommentMqDTO> likeUnlikeCommentMqDTOS = Lists.newArrayList();
|
||||
|
||||
msgs.forEach(msg -> {
|
||||
String tag = msg.getTags(); // Tag 标签
|
||||
String msgJson = new String(msg.getBody()); // 消息体 Json 字符串
|
||||
log.info("==> 【评论点赞、取消点赞】Consumer - Tag: {}, Received message: {}", tag, msgJson);
|
||||
// Json 转 DTO
|
||||
likeUnlikeCommentMqDTOS.add(JsonUtils.parseObject(msgJson, LikeUnlikeCommentMqDTO.class));
|
||||
});
|
||||
|
||||
// 按评论 ID 分组
|
||||
Map<Long, List<LikeUnlikeCommentMqDTO>> commentIdAndListMap = likeUnlikeCommentMqDTOS.stream()
|
||||
.collect(Collectors.groupingBy(LikeUnlikeCommentMqDTO::getCommentId));
|
||||
|
||||
List<LikeUnlikeCommentMqDTO> finalLikeUnlikeCommentMqDTOS = Lists.newArrayList();
|
||||
|
||||
commentIdAndListMap.forEach((commentId, ops) -> {
|
||||
// 优化:若某个用户对某评论,多次操作,如点赞 -> 取消点赞 -> 点赞,需进行操作合并,只提取最后一次操作,进一步降低操作数据库的频率
|
||||
Map<Long, LikeUnlikeCommentMqDTO> userLastOp = ops.stream()
|
||||
.collect(Collectors.toMap(
|
||||
LikeUnlikeCommentMqDTO::getUserId, // 以发布评论的用户 ID 作为 Map 的键
|
||||
Function.identity(), // 直接使用 DTO 对象本身作为 Map 的值
|
||||
// 合并策略:当出现重复键(同一用户多次操作)时,保留时间更晚的记录
|
||||
(oldValue, newValue) ->
|
||||
oldValue.getCreateTime().isAfter(newValue.getCreateTime()) ? oldValue : newValue
|
||||
));
|
||||
|
||||
|
||||
finalLikeUnlikeCommentMqDTOS.addAll(userLastOp.values());
|
||||
});
|
||||
|
||||
// 批量操作数据库
|
||||
executeBatchSQL(finalLikeUnlikeCommentMqDTOS);
|
||||
|
||||
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
// 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
}
|
||||
});
|
||||
|
||||
// 启动消费者
|
||||
consumer.start();
|
||||
return consumer;
|
||||
}
|
||||
|
||||
private void executeBatchSQL(List<LikeUnlikeCommentMqDTO> values) {
|
||||
// 过滤出点赞操作
|
||||
List<LikeUnlikeCommentMqDTO> likes = values.stream()
|
||||
.filter(op -> Objects.equals(op.getType(), LikeUnlikeCommentTypeEnum.LIKE.getCode()))
|
||||
.toList();
|
||||
|
||||
// 过滤出取消点赞操作
|
||||
List<LikeUnlikeCommentMqDTO> unlikes = values.stream()
|
||||
.filter(op -> Objects.equals(op.getType(), LikeUnlikeCommentTypeEnum.UNLIKE.getCode()))
|
||||
.toList();
|
||||
|
||||
// 取消点赞:批量删除
|
||||
if (CollUtil.isNotEmpty(unlikes)) {
|
||||
commentLikeDOMapper.batchDelete(unlikes);
|
||||
}
|
||||
|
||||
// 点赞:批量新增
|
||||
if (CollUtil.isNotEmpty(likes)) {
|
||||
commentLikeDOMapper.batchInsert(likes);
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (Objects.nonNull(consumer)) {
|
||||
try {
|
||||
consumer.shutdown(); // 关闭消费者
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,8 +2,46 @@ package com.hanserwei.hannote.comment.biz.domain.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentLikeDO;
|
||||
import com.hanserwei.hannote.comment.biz.model.dto.LikeUnlikeCommentMqDTO;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface CommentLikeDOMapper extends BaseMapper<CommentLikeDO> {
|
||||
|
||||
/**
|
||||
* 查询某个评论是否被点赞
|
||||
*
|
||||
* @param userId 用户 ID
|
||||
* @param commentId 评论 ID
|
||||
* @return 1 表示已点赞,0 表示未点赞
|
||||
*/
|
||||
int selectCountByUserIdAndCommentId(@Param("userId") Long userId,
|
||||
@Param("commentId") Long commentId);
|
||||
|
||||
/**
|
||||
* 查询对应用户点赞的所有评论
|
||||
*
|
||||
* @param userId 用户 ID
|
||||
* @return 评论点赞列表
|
||||
*/
|
||||
List<CommentLikeDO> selectByUserId(@Param("userId") Long userId);
|
||||
|
||||
/**
|
||||
* 批量删除点赞记录
|
||||
*
|
||||
* @param unlikes 删除点赞记录
|
||||
* @return 删除数量
|
||||
*/
|
||||
int batchDelete(@Param("unlikes") List<LikeUnlikeCommentMqDTO> unlikes);
|
||||
|
||||
/**
|
||||
* 批量添加点赞记录
|
||||
*
|
||||
* @param likes 添加点赞记录
|
||||
* @return 添加数量
|
||||
*/
|
||||
int batchInsert(@Param("likes") List<LikeUnlikeCommentMqDTO> likes);
|
||||
}
|
||||
@@ -15,6 +15,7 @@ public enum ResponseCodeEnum implements BaseExceptionInterface {
|
||||
// ----------- 业务异常状态码 -----------
|
||||
COMMENT_NOT_FOUND("COMMENT-20001", "此评论不存在"),
|
||||
PARENT_COMMENT_NOT_FOUND("COMMENT-20000", "此父评论不存在"),
|
||||
COMMENT_ALREADY_LIKED("COMMENT-20002", "您已经点赞过该评论"),
|
||||
;
|
||||
|
||||
// 异常码
|
||||
|
||||
@@ -19,7 +19,9 @@ import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
||||
import com.hanserwei.hannote.comment.biz.constants.RedisKeyConstants;
|
||||
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
|
||||
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentLikeDO;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentLikeDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.NoteCountDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
|
||||
import com.hanserwei.hannote.comment.biz.enums.CommentLikeLuaResultEnum;
|
||||
@@ -81,6 +83,8 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
@Resource(name = "taskExecutor")
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
@Resource
|
||||
private CommentLikeDOMapper commentLikeDOMapper;
|
||||
|
||||
/**
|
||||
* 评论详情本地缓存
|
||||
@@ -464,11 +468,39 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
|
||||
switch (commentLikeLuaResultEnum) {
|
||||
// Redis 中布隆过滤器不存在
|
||||
case NOT_EXIST -> {
|
||||
// TODO:
|
||||
// 从数据库中校验评论是否被点赞,并异步初始化布隆过滤器,设置过期时间
|
||||
int count = commentLikeDOMapper.selectCountByUserIdAndCommentId(userId, commentId);
|
||||
|
||||
// 保底1小小时+随机秒数
|
||||
long expireSeconds = 60 * 60 + RandomUtil.randomInt(60 * 60);
|
||||
|
||||
// 目标评论已经被点赞
|
||||
if (count > 0) {
|
||||
// 异步初始化布隆过滤器
|
||||
// 异步初始化布隆过滤器
|
||||
threadPoolTaskExecutor.submit(() ->
|
||||
batchAddCommentLike2BloomAndExpire(userId, expireSeconds, bloomUserCommentLikeListKey));
|
||||
|
||||
throw new ApiException(ResponseCodeEnum.COMMENT_ALREADY_LIKED);
|
||||
}
|
||||
// 若目标评论未被点赞,查询当前用户是否有点赞其他评论,有则同步初始化布隆过滤器
|
||||
batchAddCommentLike2BloomAndExpire(userId, expireSeconds, bloomUserCommentLikeListKey);
|
||||
|
||||
// 添加当前点赞评论 ID 到布隆过滤器中
|
||||
// Lua 脚本路径
|
||||
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_add_comment_like_and_expire.lua")));
|
||||
// 返回值类型
|
||||
script.setResultType(Long.class);
|
||||
redisTemplate.execute(script, Collections.singletonList(bloomUserCommentLikeListKey), commentId, expireSeconds);
|
||||
}
|
||||
// 目标评论已经被点赞 (可能存在误判,需要进一步确认)
|
||||
case COMMENT_LIKED -> {
|
||||
// TODO:
|
||||
// 查询数据库校验是否点赞
|
||||
int count = commentLikeDOMapper.selectCountByUserIdAndCommentId(userId, commentId);
|
||||
|
||||
if (count > 0) {
|
||||
throw new ApiException(ResponseCodeEnum.COMMENT_ALREADY_LIKED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -507,6 +539,38 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
|
||||
return Response.success();
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化评论点赞布隆过滤器
|
||||
*
|
||||
* @param userId 用户ID
|
||||
* @param expireSeconds 过期时间
|
||||
* @param bloomUserCommentLikeListKey 布隆过滤器 Key
|
||||
*/
|
||||
private void batchAddCommentLike2BloomAndExpire(Long userId, long expireSeconds, String bloomUserCommentLikeListKey) {
|
||||
try {
|
||||
// 查询该用户点赞的所有评论
|
||||
List<CommentLikeDO> commentLikeDOS = commentLikeDOMapper.selectByUserId(userId);
|
||||
|
||||
// 若不为空,批量添加到布隆过滤器中
|
||||
if (CollUtil.isNotEmpty(commentLikeDOS)) {
|
||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||
// Lua 脚本路径
|
||||
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_batch_add_comment_like_and_expire.lua")));
|
||||
// 返回值类型
|
||||
script.setResultType(Long.class);
|
||||
|
||||
// 构建 Lua 参数
|
||||
List<Object> luaArgs = Lists.newArrayList();
|
||||
commentLikeDOS.forEach(commentLikeDO ->
|
||||
luaArgs.add(commentLikeDO.getCommentId())); // 将每个点赞的评论 ID 传入
|
||||
luaArgs.add(expireSeconds); // 最后一个参数是过期时间(秒)
|
||||
redisTemplate.execute(script, Collections.singletonList(bloomUserCommentLikeListKey), luaArgs.toArray());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("## 异步初始化【评论点赞】布隆过滤器异常: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验被点赞的评论是否存在
|
||||
*
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
-- 操作的 Key
|
||||
local key = KEYS[1]
|
||||
local commentId = ARGV[1] -- 评论ID
|
||||
local expireSeconds = ARGV[2] -- 过期时间(秒)
|
||||
|
||||
redis.call("BF.ADD", key, commentId)
|
||||
|
||||
-- 设置过期时间
|
||||
redis.call("EXPIRE", key, expireSeconds)
|
||||
return 0
|
||||
@@ -0,0 +1,12 @@
|
||||
-- 操作的 Key
|
||||
local key = KEYS[1]
|
||||
|
||||
for i = 1, #ARGV - 1 do
|
||||
redis.call("BF.ADD", key, ARGV[i])
|
||||
end
|
||||
|
||||
---- 最后一个参数为过期时间
|
||||
local expireTime = ARGV[#ARGV]
|
||||
-- 设置过期时间
|
||||
redis.call("EXPIRE", key, expireTime)
|
||||
return 0
|
||||
@@ -13,4 +13,36 @@
|
||||
<!--@mbg.generated-->
|
||||
id, user_id, comment_id, create_time
|
||||
</sql>
|
||||
|
||||
<select id="selectCountByUserIdAndCommentId" resultType="int" parameterType="map">
|
||||
select count(1)
|
||||
from t_comment_like
|
||||
where user_id = #{userId}
|
||||
and comment_id = #{commentId}
|
||||
limit 1
|
||||
</select>
|
||||
|
||||
<select id="selectByUserId" resultMap="BaseResultMap" parameterType="map">
|
||||
select comment_id
|
||||
from t_comment_like
|
||||
where user_id = #{userId}
|
||||
</select>
|
||||
|
||||
<delete id="batchDelete" parameterType="map">
|
||||
DELETE
|
||||
FROM t_comment_like
|
||||
WHERE (comment_id, user_id) IN
|
||||
<foreach collection="unlikes" item="unlike" open="(" separator="," close=")">
|
||||
(#{unlike.commentId}, #{unlike.userId})
|
||||
</foreach>
|
||||
</delete>
|
||||
|
||||
<insert id="batchInsert" parameterType="list">
|
||||
INSERT INTO t_comment_like (comment_id, user_id, create_time)
|
||||
VALUES
|
||||
<foreach collection="likes" item="like" separator=",">
|
||||
(#{like.commentId}, #{like.userId}, #{like.createTime})
|
||||
</foreach>
|
||||
ON DUPLICATE KEY UPDATE id=id
|
||||
</insert>
|
||||
</mapper>
|
||||
Reference in New Issue
Block a user