diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java index 1e07ffc..25b2a16 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java @@ -27,4 +27,9 @@ public interface MQConstants { */ String TAG_LIKE = "Like"; + /** + * Tag 标签:取消点赞 + */ + String TAG_UNLIKE = "UnLike"; + } \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java index c9738f7..7575be5 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java @@ -65,7 +65,7 @@ public class Comment2DBConsumer { // 每秒创建 1000 个令牌 private final RateLimiter rateLimiter = RateLimiter.create(1000); - @Bean + @Bean(name = "Comment2DBConsumer") public DefaultMQPushConsumer mqPushConsumer() throws MQClientException { // Group组 String group = "han_note_group_" + MQConstants.TOPIC_PUBLISH_COMMENT; diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/LikeUnlikeComment2DBConsumer.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/LikeUnlikeComment2DBConsumer.java new file mode 100644 index 0000000..f8c32e0 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/LikeUnlikeComment2DBConsumer.java @@ -0,0 +1,156 @@ +package com.hanserwei.hannote.comment.biz.consumer; + +import cn.hutool.core.collection.CollUtil; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.RateLimiter; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.comment.biz.constants.MQConstants; +import com.hanserwei.hannote.comment.biz.domain.mapper.CommentLikeDOMapper; +import com.hanserwei.hannote.comment.biz.enums.LikeUnlikeCommentTypeEnum; +import com.hanserwei.hannote.comment.biz.model.dto.LikeUnlikeCommentMqDTO; +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +@SuppressWarnings("UnstableApiUsage") +@Component +@Slf4j +public class LikeUnlikeComment2DBConsumer { + + // 每秒创建 5000 个令牌 + private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Value("${rocketmq.name-server}") + private String nameServer; + @Resource + private CommentLikeDOMapper commentLikeDOMapper; + private DefaultMQPushConsumer consumer; + + @Bean(name = "LikeUnlikeComment2DBConsumer") + public DefaultMQPushConsumer mqPushConsumer() throws MQClientException { + // Group 组 + String group = "han_note_group_" + MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE; + + // 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名 + consumer = new DefaultMQPushConsumer(group); + + // 设置 RocketMQ 的 NameServer 地址 + consumer.setNamesrvAddr(nameServer); + + // 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息) + consumer.subscribe(MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE, "*"); + + // 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + // 设置消息消费模式,这里使用集群模式 (CLUSTERING) + consumer.setMessageModel(MessageModel.CLUSTERING); + + // 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。 + consumer.setMaxReconsumeTimes(3); + // 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。 + consumer.setConsumeMessageBatchMaxSize(30); + + // 注册消息监听器 + consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { + log.info("==> 【评论点赞、取消点赞】本批次消息大小: {}", msgs.size()); + try { + // 令牌桶流控, 以控制数据库能够承受的 QPS + rateLimiter.acquire(); + + // 将批次 Json 消息体转换 DTO 集合 + List likeUnlikeCommentMqDTOS = Lists.newArrayList(); + + msgs.forEach(msg -> { + String tag = msg.getTags(); // Tag 标签 + String msgJson = new String(msg.getBody()); // 消息体 Json 字符串 + log.info("==> 【评论点赞、取消点赞】Consumer - Tag: {}, Received message: {}", tag, msgJson); + // Json 转 DTO + likeUnlikeCommentMqDTOS.add(JsonUtils.parseObject(msgJson, LikeUnlikeCommentMqDTO.class)); + }); + + // 按评论 ID 分组 + Map> commentIdAndListMap = likeUnlikeCommentMqDTOS.stream() + .collect(Collectors.groupingBy(LikeUnlikeCommentMqDTO::getCommentId)); + + List finalLikeUnlikeCommentMqDTOS = Lists.newArrayList(); + + commentIdAndListMap.forEach((commentId, ops) -> { + // 优化:若某个用户对某评论,多次操作,如点赞 -> 取消点赞 -> 点赞,需进行操作合并,只提取最后一次操作,进一步降低操作数据库的频率 + Map userLastOp = ops.stream() + .collect(Collectors.toMap( + LikeUnlikeCommentMqDTO::getUserId, // 以发布评论的用户 ID 作为 Map 的键 + Function.identity(), // 直接使用 DTO 对象本身作为 Map 的值 + // 合并策略:当出现重复键(同一用户多次操作)时,保留时间更晚的记录 + (oldValue, newValue) -> + oldValue.getCreateTime().isAfter(newValue.getCreateTime()) ? oldValue : newValue + )); + + + finalLikeUnlikeCommentMqDTOS.addAll(userLastOp.values()); + }); + + // 批量操作数据库 + executeBatchSQL(finalLikeUnlikeCommentMqDTOS); + + // 手动 ACK,告诉 RocketMQ 这批次消息消费成功 + return ConsumeOrderlyStatus.SUCCESS; + } catch (Exception e) { + log.error("", e); + // 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试 + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + }); + + // 启动消费者 + consumer.start(); + return consumer; + } + + private void executeBatchSQL(List values) { + // 过滤出点赞操作 + List likes = values.stream() + .filter(op -> Objects.equals(op.getType(), LikeUnlikeCommentTypeEnum.LIKE.getCode())) + .toList(); + + // 过滤出取消点赞操作 + List unlikes = values.stream() + .filter(op -> Objects.equals(op.getType(), LikeUnlikeCommentTypeEnum.UNLIKE.getCode())) + .toList(); + + // 取消点赞:批量删除 + if (CollUtil.isNotEmpty(unlikes)) { + commentLikeDOMapper.batchDelete(unlikes); + } + + // 点赞:批量新增 + if (CollUtil.isNotEmpty(likes)) { + commentLikeDOMapper.batchInsert(likes); + } + } + + @PreDestroy + public void destroy() { + if (Objects.nonNull(consumer)) { + try { + consumer.shutdown(); // 关闭消费者 + } catch (Exception e) { + log.error("", e); + } + } + } +} diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/controller/CommentController.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/controller/CommentController.java index 8b9800d..55275dd 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/controller/CommentController.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/controller/CommentController.java @@ -45,4 +45,10 @@ public class CommentController { return commentService.likeComment(likeCommentReqVO); } + @PostMapping("/unlike") + @ApiOperationLog(description = "评论取消点赞") + public Response unlikeComment(@Validated @RequestBody UnLikeCommentReqVO unLikeCommentReqVO) { + return commentService.unlikeComment(unLikeCommentReqVO); + } + } \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/domain/mapper/CommentLikeDOMapper.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/domain/mapper/CommentLikeDOMapper.java index fa0da8d..a8213ea 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/domain/mapper/CommentLikeDOMapper.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/domain/mapper/CommentLikeDOMapper.java @@ -2,8 +2,46 @@ package com.hanserwei.hannote.comment.biz.domain.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentLikeDO; +import com.hanserwei.hannote.comment.biz.model.dto.LikeUnlikeCommentMqDTO; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; @Mapper public interface CommentLikeDOMapper extends BaseMapper { + + /** + * 查询某个评论是否被点赞 + * + * @param userId 用户 ID + * @param commentId 评论 ID + * @return 1 表示已点赞,0 表示未点赞 + */ + int selectCountByUserIdAndCommentId(@Param("userId") Long userId, + @Param("commentId") Long commentId); + + /** + * 查询对应用户点赞的所有评论 + * + * @param userId 用户 ID + * @return 评论点赞列表 + */ + List selectByUserId(@Param("userId") Long userId); + + /** + * 批量删除点赞记录 + * + * @param unlikes 删除点赞记录 + * @return 删除数量 + */ + int batchDelete(@Param("unlikes") List unlikes); + + /** + * 批量添加点赞记录 + * + * @param likes 添加点赞记录 + * @return 添加数量 + */ + int batchInsert(@Param("likes") List likes); } \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/CommentUnlikeLuaResultEnum.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/CommentUnlikeLuaResultEnum.java new file mode 100644 index 0000000..d58b967 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/CommentUnlikeLuaResultEnum.java @@ -0,0 +1,35 @@ +package com.hanserwei.hannote.comment.biz.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + +@Getter +@AllArgsConstructor +public enum CommentUnlikeLuaResultEnum { + // 布隆过滤器不存在 + NOT_EXIST(-1L), + // 评论已点赞 + COMMENT_LIKED(1L), + // 评论未点赞 + COMMENT_NOT_LIKED(0L), + ; + + private final Long code; + + /** + * 根据类型 code 获取对应的枚举 + * + * @param code 类型 code + * @return 枚举 + */ + public static CommentUnlikeLuaResultEnum valueOf(Long code) { + for (CommentUnlikeLuaResultEnum commentUnlikeLuaResultEnum : CommentUnlikeLuaResultEnum.values()) { + if (Objects.equals(code, commentUnlikeLuaResultEnum.getCode())) { + return commentUnlikeLuaResultEnum; + } + } + return null; + } +} \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/ResponseCodeEnum.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/ResponseCodeEnum.java index 76c35fb..6d09284 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/ResponseCodeEnum.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/enums/ResponseCodeEnum.java @@ -15,6 +15,8 @@ public enum ResponseCodeEnum implements BaseExceptionInterface { // ----------- 业务异常状态码 ----------- COMMENT_NOT_FOUND("COMMENT-20001", "此评论不存在"), PARENT_COMMENT_NOT_FOUND("COMMENT-20000", "此父评论不存在"), + COMMENT_ALREADY_LIKED("COMMENT-20002", "您已经点赞过该评论"), + COMMENT_NOT_LIKED("COMMENT-20003", "您未点赞该评论,无法取消点赞"), ; // 异常码 diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/vo/UnLikeCommentReqVO.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/vo/UnLikeCommentReqVO.java new file mode 100644 index 0000000..fa84c23 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/vo/UnLikeCommentReqVO.java @@ -0,0 +1,18 @@ +package com.hanserwei.hannote.comment.biz.model.vo; + +import jakarta.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class UnLikeCommentReqVO { + + @NotNull(message = "评论 ID 不能为空") + private Long commentId; + +} \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/CommentService.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/CommentService.java index a81d93a..7b18ebc 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/CommentService.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/CommentService.java @@ -38,4 +38,12 @@ public interface CommentService extends IService { * @return 响应 */ Response likeComment(LikeCommentReqVO likeCommentReqVO); + + /** + * 取消评论点赞 + * + * @param unLikeCommentReqVO 取消评论点赞请求 + * @return 响应 + */ + Response unlikeComment(UnLikeCommentReqVO unLikeCommentReqVO); } diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java index c97ce8d..7a8ca8b 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/service/impl/CommentServiceImpl.java @@ -19,12 +19,11 @@ import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.comment.biz.constants.MQConstants; import com.hanserwei.hannote.comment.biz.constants.RedisKeyConstants; import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO; +import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentLikeDO; import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper; +import com.hanserwei.hannote.comment.biz.domain.mapper.CommentLikeDOMapper; import com.hanserwei.hannote.comment.biz.domain.mapper.NoteCountDOMapper; -import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum; -import com.hanserwei.hannote.comment.biz.enums.CommentLikeLuaResultEnum; -import com.hanserwei.hannote.comment.biz.enums.LikeUnlikeCommentTypeEnum; -import com.hanserwei.hannote.comment.biz.enums.ResponseCodeEnum; +import com.hanserwei.hannote.comment.biz.enums.*; import com.hanserwei.hannote.comment.biz.model.dto.LikeUnlikeCommentMqDTO; import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO; import com.hanserwei.hannote.comment.biz.model.vo.*; @@ -81,6 +80,8 @@ public class CommentServiceImpl extends ServiceImpl private RedisTemplate redisTemplate; @Resource(name = "taskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; + @Resource + private CommentLikeDOMapper commentLikeDOMapper; /** * 评论详情本地缓存 @@ -464,11 +465,39 @@ public class CommentServiceImpl extends ServiceImpl switch (commentLikeLuaResultEnum) { // Redis 中布隆过滤器不存在 case NOT_EXIST -> { - // TODO: + // 从数据库中校验评论是否被点赞,并异步初始化布隆过滤器,设置过期时间 + int count = commentLikeDOMapper.selectCountByUserIdAndCommentId(userId, commentId); + + // 保底1小小时+随机秒数 + long expireSeconds = 60 * 60 + RandomUtil.randomInt(60 * 60); + + // 目标评论已经被点赞 + if (count > 0) { + // 异步初始化布隆过滤器 + // 异步初始化布隆过滤器 + threadPoolTaskExecutor.submit(() -> + batchAddCommentLike2BloomAndExpire(userId, expireSeconds, bloomUserCommentLikeListKey)); + + throw new ApiException(ResponseCodeEnum.COMMENT_ALREADY_LIKED); + } + // 若目标评论未被点赞,查询当前用户是否有点赞其他评论,有则同步初始化布隆过滤器 + batchAddCommentLike2BloomAndExpire(userId, expireSeconds, bloomUserCommentLikeListKey); + + // 添加当前点赞评论 ID 到布隆过滤器中 + // Lua 脚本路径 + script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_add_comment_like_and_expire.lua"))); + // 返回值类型 + script.setResultType(Long.class); + redisTemplate.execute(script, Collections.singletonList(bloomUserCommentLikeListKey), commentId, expireSeconds); } // 目标评论已经被点赞 (可能存在误判,需要进一步确认) case COMMENT_LIKED -> { - // TODO: + // 查询数据库校验是否点赞 + int count = commentLikeDOMapper.selectCountByUserIdAndCommentId(userId, commentId); + + if (count > 0) { + throw new ApiException(ResponseCodeEnum.COMMENT_ALREADY_LIKED); + } } } @@ -507,6 +536,122 @@ public class CommentServiceImpl extends ServiceImpl return Response.success(); } + @Override + public Response unlikeComment(UnLikeCommentReqVO unLikeCommentReqVO) { + // 被取消点赞的评论 ID + Long commentId = unLikeCommentReqVO.getCommentId(); + + // 1. 校验评论是否存在 + checkCommentIsExist(commentId); + + // 2. 校验评论是否被点赞过 + // 当前登录用户ID + Long userId = LoginUserContextHolder.getUserId(); + // 布隆过滤器 Key + String bloomUserCommentLikeListKey = RedisKeyConstants.buildBloomCommentLikesKey(userId); + + DefaultRedisScript script = new DefaultRedisScript<>(); + // Lua 脚本路径 + script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_comment_unlike_check.lua"))); + // 返回值类型 + script.setResultType(Long.class); + + // 执行 Lua 脚本,拿到返回结果 + Long result = redisTemplate.execute(script, Collections.singletonList(bloomUserCommentLikeListKey), commentId); + + CommentUnlikeLuaResultEnum commentUnlikeLuaResultEnum = CommentUnlikeLuaResultEnum.valueOf(result); + + if (Objects.isNull(commentUnlikeLuaResultEnum)) { + throw new ApiException(ResponseCodeEnum.PARAM_NOT_VALID); + } + + switch (commentUnlikeLuaResultEnum) { + // 布隆过滤器不存在 + case NOT_EXIST -> { + // 异步初始化布隆过滤器 + threadPoolTaskExecutor.submit(() -> { + // 保底1小时+随机秒数 + long expireSeconds = 60 * 60 + RandomUtil.randomInt(60 * 60); + batchAddCommentLike2BloomAndExpire(userId, expireSeconds, bloomUserCommentLikeListKey); + }); + + // 从数据库中校验评论是否被点赞 + int count = commentLikeDOMapper.selectCountByUserIdAndCommentId(userId, commentId); + + // 未点赞,无法取消点赞操作,抛出业务异常 + if (count == 0) throw new ApiException(ResponseCodeEnum.COMMENT_NOT_LIKED); + } + // 布隆过滤器校验目标评论未被点赞(判断绝对正确) + case COMMENT_NOT_LIKED -> throw new ApiException(ResponseCodeEnum.COMMENT_NOT_LIKED); + } + + // 3. 发送顺序 MQ,删除评论点赞记录 + // 构建消息体 DTO + LikeUnlikeCommentMqDTO likeUnlikeCommentMqDTO = LikeUnlikeCommentMqDTO.builder() + .userId(userId) + .commentId(commentId) + .type(LikeUnlikeCommentTypeEnum.UNLIKE.getCode()) // 取消点赞评论 + .createTime(LocalDateTime.now()) + .build(); + + // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 + Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(likeUnlikeCommentMqDTO)) + .build(); + + // 通过冒号连接, 可让 MQ 发送给主题 Topic 时,携带上标签 Tag + String destination = MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE + ":" + MQConstants.TAG_UNLIKE; + + // MQ 分区键 + String hashKey = String.valueOf(userId); + + // 异步发送 MQ 顺序消息,提升接口响应速度 + rocketMQTemplate.asyncSendOrderly(destination, message, hashKey, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【评论取消点赞】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【评论取消点赞】MQ 发送异常: ", throwable); + } + }); + + return Response.success(); + } + + /** + * 初始化评论点赞布隆过滤器 + * + * @param userId 用户ID + * @param expireSeconds 过期时间 + * @param bloomUserCommentLikeListKey 布隆过滤器 Key + */ + private void batchAddCommentLike2BloomAndExpire(Long userId, long expireSeconds, String bloomUserCommentLikeListKey) { + try { + // 查询该用户点赞的所有评论 + List commentLikeDOS = commentLikeDOMapper.selectByUserId(userId); + + // 若不为空,批量添加到布隆过滤器中 + if (CollUtil.isNotEmpty(commentLikeDOS)) { + DefaultRedisScript script = new DefaultRedisScript<>(); + // Lua 脚本路径 + script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_batch_add_comment_like_and_expire.lua"))); + // 返回值类型 + script.setResultType(Long.class); + + // 构建 Lua 参数 + List luaArgs = Lists.newArrayList(); + commentLikeDOS.forEach(commentLikeDO -> + luaArgs.add(commentLikeDO.getCommentId())); // 将每个点赞的评论 ID 传入 + luaArgs.add(expireSeconds); // 最后一个参数是过期时间(秒) + redisTemplate.execute(script, Collections.singletonList(bloomUserCommentLikeListKey), luaArgs.toArray()); + } + } catch (Exception e) { + log.error("## 异步初始化【评论点赞】布隆过滤器异常: ", e); + } + } + /** * 校验被点赞的评论是否存在 * diff --git a/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_add_comment_like_and_expire.lua b/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_add_comment_like_and_expire.lua new file mode 100644 index 0000000..fb77769 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_add_comment_like_and_expire.lua @@ -0,0 +1,10 @@ +-- 操作的 Key +local key = KEYS[1] +local commentId = ARGV[1] -- 评论ID +local expireSeconds = ARGV[2] -- 过期时间(秒) + +redis.call("BF.ADD", key, commentId) + +-- 设置过期时间 +redis.call("EXPIRE", key, expireSeconds) +return 0 diff --git a/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_batch_add_comment_like_and_expire.lua b/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_batch_add_comment_like_and_expire.lua new file mode 100644 index 0000000..4d2345b --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_batch_add_comment_like_and_expire.lua @@ -0,0 +1,12 @@ +-- 操作的 Key +local key = KEYS[1] + +for i = 1, #ARGV - 1 do + redis.call("BF.ADD", key, ARGV[i]) +end + +---- 最后一个参数为过期时间 +local expireTime = ARGV[#ARGV] +-- 设置过期时间 +redis.call("EXPIRE", key, expireTime) +return 0 diff --git a/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_comment_unlike_check.lua b/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_comment_unlike_check.lua new file mode 100644 index 0000000..72e1cdb --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/resources/lua/bloom_comment_unlike_check.lua @@ -0,0 +1,11 @@ +local key = KEYS[1] -- 操作的 Redis Key +local commentId = ARGV[1] -- 评论ID + +-- 使用 EXISTS 命令检查布隆过滤器是否存在 +local exists = redis.call('EXISTS', key) +if exists == 0 then + return -1 +end + +-- 校验该评论是否被点赞过(1 表示已经点赞,0 表示未点赞) +return redis.call('BF.EXISTS', key, commentId) \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/resources/mapperxml/CommentLikeDOMapper.xml b/han-note-comment/han-note-comment-biz/src/main/resources/mapperxml/CommentLikeDOMapper.xml index 3eaef22..0444e7a 100644 --- a/han-note-comment/han-note-comment-biz/src/main/resources/mapperxml/CommentLikeDOMapper.xml +++ b/han-note-comment/han-note-comment-biz/src/main/resources/mapperxml/CommentLikeDOMapper.xml @@ -13,4 +13,36 @@ id, user_id, comment_id, create_time + + + + + + + DELETE + FROM t_comment_like + WHERE (comment_id, user_id) IN + + (#{unlike.commentId}, #{unlike.userId}) + + + + + INSERT INTO t_comment_like (comment_id, user_id, create_time) + VALUES + + (#{like.commentId}, #{like.userId}, #{like.createTime}) + + ON DUPLICATE KEY UPDATE id=id + \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java index 876798a..0daa74b 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java @@ -57,6 +57,16 @@ public interface MQConstants { */ String TOPIC_NOTE_OPERATE = "NoteOperateTopic"; + /** + * Topic: 评论点赞数更新 + */ + String TOPIC_COMMENT_LIKE_OR_UNLIKE = "CommentLikeUnlikeTopic"; + + /** + * Topic: 计数 - 评论点赞数落库 + */ + String TOPIC_COUNT_COMMENT_LIKE_2_DB = "CountCommentLike2DBTTopic"; + /** * Tag 标签:笔记发布 */ diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountCommentLike2DBConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountCommentLike2DBConsumer.java new file mode 100644 index 0000000..98fa1ef --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountCommentLike2DBConsumer.java @@ -0,0 +1,55 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import cn.hutool.core.collection.CollUtil; +import com.google.common.util.concurrent.RateLimiter; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.domain.mapper.CommentDOMapper; +import com.hanserwei.hannote.count.biz.model.dto.AggregationCountLikeUnlikeCommentMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.util.List; + +@SuppressWarnings("ALL") +@Component +@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_COMMENT_LIKE_2_DB, // Group 组 + topic = MQConstants.TOPIC_COUNT_COMMENT_LIKE_2_DB // 主题 Topic +) +@Slf4j +public class CountCommentLike2DBConsumer implements RocketMQListener { + + // 每秒创建 5000 个令牌 + private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Resource + private CommentDOMapper commentDOMapper; + + @Override + public void onMessage(String body) { + // 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得 + rateLimiter.acquire(); + + log.info("## 消费到了 MQ 【计数: 评论点赞数入库】, {}...", body); + + List countList = null; + try { + countList = JsonUtils.parseList(body, AggregationCountLikeUnlikeCommentMqDTO.class); + } catch (Exception e) { + log.error("## 解析 JSON 字符串异常", e); + } + + if (CollUtil.isNotEmpty(countList)) { + // 更新评论点赞数 + countList.forEach(item -> { + Long commentId = item.getCommentId(); + Integer count = item.getCount(); + + commentDOMapper.updateLikeTotalByCommentId(count, commentId); + }); + } + } + +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountCommentLikeConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountCommentLikeConsumer.java new file mode 100644 index 0000000..9c6a230 --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountCommentLikeConsumer.java @@ -0,0 +1,138 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import com.github.phantomthief.collection.BufferTrigger; +import com.google.common.collect.Lists; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants; +import com.hanserwei.hannote.count.biz.enums.LikeUnlikeCommentTypeEnum; +import com.hanserwei.hannote.count.biz.model.dto.AggregationCountLikeUnlikeCommentMqDTO; +import com.hanserwei.hannote.count.biz.model.dto.CountLikeUnlikeCommentMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +@Component +@RocketMQMessageListener(consumerGroup = "han_note_group_count_" + MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE, // Group 组 + topic = MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE // 主题 Topic +) +@Slf4j +public class CountCommentLikeConsumer implements RocketMQListener { + + @Resource + private RedisTemplate redisTemplate; + @Resource + private RocketMQTemplate rocketMQTemplate; + + private final BufferTrigger bufferTrigger = BufferTrigger.batchBlocking() + .bufferSize(50000) // 缓存队列的最大容量 + .batchSize(1000) // 一批次最多聚合 1000 条 + .linger(Duration.ofSeconds(1)) // 多久聚合一次 + .setConsumerEx(this::consumeMessage) // 设置消费者方法 + .build(); + + @Override + public void onMessage(String body) { + // 往 bufferTrigger 中添加元素 + bufferTrigger.enqueue(body); + } + + private void consumeMessage(List bodys) { + log.info("==> 【评论点赞数】聚合消息, size: {}", bodys.size()); + log.info("==> 【评论点赞数】聚合消息, {}", JsonUtils.toJsonString(bodys)); + + // List 转 List + List countLikeUnlikeCommentMqDTOS = bodys.stream() + .map(body -> JsonUtils.parseObject(body, CountLikeUnlikeCommentMqDTO.class)).toList(); + + // 按评论 ID 进行分组 + Map> groupMap = countLikeUnlikeCommentMqDTOS.stream() + .collect(Collectors.groupingBy(CountLikeUnlikeCommentMqDTO::getCommentId)); + + // 按组汇总数据,统计出最终的计数 + // 最终操作的计数对象 + List countList = Lists.newArrayList(); + + for (Map.Entry> entry : groupMap.entrySet()) { + // 评论 ID + Long commentId = entry.getKey(); + + List list = entry.getValue(); + // 最终的计数值,默认为 0 + int finalCount = 0; + for (CountLikeUnlikeCommentMqDTO countLikeUnlikeCommentMqDTO : list) { + // 获取操作类型 + Integer type = countLikeUnlikeCommentMqDTO.getType(); + + // 根据操作类型,获取对应枚举 + LikeUnlikeCommentTypeEnum likeUnlikeCommentTypeEnum = LikeUnlikeCommentTypeEnum.valueOf(type); + + // 若枚举为空,跳到下一次循环 + if (Objects.isNull(likeUnlikeCommentTypeEnum)) continue; + + switch (likeUnlikeCommentTypeEnum) { + case LIKE -> finalCount += 1; // 如果为点赞操作,点赞数 +1 + case UNLIKE -> finalCount -= 1; // 如果为取消点赞操作,点赞数 -1 + } + } + // 将分组后统计出的最终计数,存入 countList 中 + countList.add(AggregationCountLikeUnlikeCommentMqDTO.builder() + .commentId(commentId) + .count(finalCount) + .build()); + } + + log.info("## 【评论点赞数】聚合后的计数数据: {}", JsonUtils.toJsonString(countList)); + + // 更新 Redis + countList.forEach(item -> { + // 评论 ID + Long commentId = item.getCommentId(); + // 聚合后的计数 + Integer count = item.getCount(); + + // Redis 中评论计数 Hash Key + String countCommentRedisKey = RedisKeyConstants.buildCountCommentKey(commentId); + // 判断 Redis 中 Hash 是否存在 + boolean isCountCommentExisted = redisTemplate.hasKey(countCommentRedisKey); + + // 若存在才会更新 + // (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做) + if (isCountCommentExisted) { + // 对目标用户 Hash 中的点赞数字段进行计数操作 + redisTemplate.opsForHash().increment(countCommentRedisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, count); + } + }); + + // 发送 MQ, 评论点赞数据落库 + Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(countList)) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_COMMENT_LIKE_2_DB, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数服务:评论点赞数写库】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数服务:评论点赞数写库】MQ 发送异常: ", throwable); + } + }); + } +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/CommentDOMapper.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/CommentDOMapper.java index 5acd9fa..e5d80e1 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/CommentDOMapper.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/CommentDOMapper.java @@ -16,4 +16,14 @@ public interface CommentDOMapper extends BaseMapper { * @return 更新结果 */ int updateChildCommentTotal(@Param("parentId") Long parentId, @Param("count") int count); + + /** + * 更新评论点赞数 + * + * @param count 计数 + * @param commentId 评论 ID + * @return 更新结果 + */ + int updateLikeTotalByCommentId(@Param("count") Integer count, + @Param("commentId") Long commentId); } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/LikeUnlikeCommentTypeEnum.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/LikeUnlikeCommentTypeEnum.java new file mode 100644 index 0000000..224717a --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/LikeUnlikeCommentTypeEnum.java @@ -0,0 +1,27 @@ +package com.hanserwei.hannote.count.biz.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + +@Getter +@AllArgsConstructor +public enum LikeUnlikeCommentTypeEnum { + // 点赞 + LIKE(1), + // 取消点赞 + UNLIKE(0), + ; + + private final Integer code; + + public static LikeUnlikeCommentTypeEnum valueOf(Integer code) { + for (LikeUnlikeCommentTypeEnum likeUnlikeCommentTypeEnum : LikeUnlikeCommentTypeEnum.values()) { + if (Objects.equals(code, likeUnlikeCommentTypeEnum.getCode())) { + return likeUnlikeCommentTypeEnum; + } + } + return null; + } +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/AggregationCountLikeUnlikeCommentMqDTO.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/AggregationCountLikeUnlikeCommentMqDTO.java new file mode 100644 index 0000000..3d38af6 --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/AggregationCountLikeUnlikeCommentMqDTO.java @@ -0,0 +1,24 @@ +package com.hanserwei.hannote.count.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class AggregationCountLikeUnlikeCommentMqDTO { + + /** + * 评论 ID + */ + private Long commentId; + + /** + * 聚合后的计数 + */ + private Integer count; + +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountLikeUnlikeCommentMqDTO.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountLikeUnlikeCommentMqDTO.java new file mode 100644 index 0000000..6b910bc --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountLikeUnlikeCommentMqDTO.java @@ -0,0 +1,22 @@ +package com.hanserwei.hannote.count.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CountLikeUnlikeCommentMqDTO { + + private Long userId; + + private Long commentId; + + /** + * 0: 取消点赞, 1:点赞 + */ + private Integer type; +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/CommentDOMapper.xml b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/CommentDOMapper.xml index e1074e1..b11ba50 100644 --- a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/CommentDOMapper.xml +++ b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/CommentDOMapper.xml @@ -48,4 +48,11 @@ where id = #{parentId} and level = 1 + + + update t_comment + set like_total = like_total + #{count}, + update_time = now() + where id = #{commentId} + \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/test/java/com/hanserwei/hannote/count/biz/TestCommentLikeUnLikeConsumer.java b/han-note-count/han-note-count-biz/src/test/java/com/hanserwei/hannote/count/biz/TestCommentLikeUnLikeConsumer.java new file mode 100644 index 0000000..835748c --- /dev/null +++ b/han-note-count/han-note-count-biz/src/test/java/com/hanserwei/hannote/count/biz/TestCommentLikeUnLikeConsumer.java @@ -0,0 +1,78 @@ +package com.hanserwei.hannote.count.biz; + +import com.hanserwei.framework.common.utils.JsonUtils; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import java.time.LocalDateTime; + +@SpringBootTest +public class TestCommentLikeUnLikeConsumer { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + /** + * 测试:模拟发送评论点赞、取消点赞消息 + */ + @Test + void testBatchSendLikeUnlikeCommentMQ() { + Long userId = 2001L; + Long commentId = 4001L; + + for (long i = 0; i < 32; i++) { + // 构建消息体 DTO + LikeUnlikeCommentMqDTO likeUnlikeCommentMqDTO = LikeUnlikeCommentMqDTO.builder() + .userId(userId) + .commentId(commentId) + .createTime(LocalDateTime.now()) + .build(); + + // 通过冒号连接, 可让 MQ 发送给主题 Topic 时,携带上标签 Tag + String destination = "CommentLikeUnlikeTopic:"; + + if (i % 2 == 0) { // 偶数 + likeUnlikeCommentMqDTO.setType(0); // 取消点赞 + destination = destination + "Unlike"; + } else { // 奇数 + likeUnlikeCommentMqDTO.setType(1); // 点赞 + destination = destination + "Like"; + } + + // MQ 分区键 + String hashKey = String.valueOf(userId); + + // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 + Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(likeUnlikeCommentMqDTO)) + .build(); + + // 同步发送 MQ 消息 + rocketMQTemplate.syncSendOrderly(destination, message, hashKey); + } + } + + @Getter + @Setter + @Builder + static class LikeUnlikeCommentMqDTO { + + private Long userId; + + private Long commentId; + + /** + * 0: 取消点赞, 1:点赞 + */ + private Integer type; + + private LocalDateTime createTime; + + } +} diff --git a/http-client/gateApi.http b/http-client/gateApi.http index 70811d6..94ee0ba 100644 --- a/http-client/gateApi.http +++ b/http-client/gateApi.http @@ -3,7 +3,7 @@ POST http://localhost:8000/auth/verification/code/send Content-Type: application/json { - "email": "2628273921@qq.com" + "email": "ssw010723@gmail.com" } ### 登录/注册 @@ -11,8 +11,8 @@ POST http://localhost:8000/auth/login Content-Type: application/json { - "email": "2628273921@qq.com", - "code": "825004", + "email": "ssw010723@gmail.com", + "code": "116253", "type": 1 } @@ -368,9 +368,19 @@ Authorization: Bearer {{token}} } ### 点赞评论 -POST http://localhost:8093/comment/like +POST http://localhost:8000/comment/comment/like Content-Type: application/json +Authorization: Bearer {{token}} { - "commentId": 4002 + "commentId": 8001 +} + +### 取消点赞评论 +POST http://localhost:8000/comment/comment/unlike +Content-Type: application/json +Authorization: Bearer {{token}} + +{ + "commentId": 8001 }