feat(comment): 计数服务:评论点赞数更新,取消点赞接口
- 新增取消点赞接口 /comment/unlike - 添加布隆过滤器校验评论是否已点赞 - 实现取消点赞时从布隆过滤器中移除记录 - 发送取消点赞消息到 RocketMQ 进行异步处理 - 新增取消点赞相关枚举和异常码 - 更新计数服务消费点赞/取消点赞消息逻辑 - 支持评论点赞数的增减与持久化更新 - 添加 HTTP 客户端测试用例
This commit is contained in:
@@ -57,6 +57,16 @@ public interface MQConstants {
|
||||
*/
|
||||
String TOPIC_NOTE_OPERATE = "NoteOperateTopic";
|
||||
|
||||
/**
|
||||
* Topic: 评论点赞数更新
|
||||
*/
|
||||
String TOPIC_COMMENT_LIKE_OR_UNLIKE = "CommentLikeUnlikeTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 评论点赞数落库
|
||||
*/
|
||||
String TOPIC_COUNT_COMMENT_LIKE_2_DB = "CountCommentLike2DBTTopic";
|
||||
|
||||
/**
|
||||
* Tag 标签:笔记发布
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.hanserwei.hannote.count.biz.consumer;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.count.biz.domain.mapper.CommentDOMapper;
|
||||
import com.hanserwei.hannote.count.biz.model.dto.AggregationCountLikeUnlikeCommentMqDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@SuppressWarnings("ALL")
|
||||
@Component
|
||||
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_COMMENT_LIKE_2_DB, // Group 组
|
||||
topic = MQConstants.TOPIC_COUNT_COMMENT_LIKE_2_DB // 主题 Topic
|
||||
)
|
||||
@Slf4j
|
||||
public class CountCommentLike2DBConsumer implements RocketMQListener<String> {
|
||||
|
||||
// 每秒创建 5000 个令牌
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||
@Resource
|
||||
private CommentDOMapper commentDOMapper;
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
|
||||
rateLimiter.acquire();
|
||||
|
||||
log.info("## 消费到了 MQ 【计数: 评论点赞数入库】, {}...", body);
|
||||
|
||||
List<AggregationCountLikeUnlikeCommentMqDTO> countList = null;
|
||||
try {
|
||||
countList = JsonUtils.parseList(body, AggregationCountLikeUnlikeCommentMqDTO.class);
|
||||
} catch (Exception e) {
|
||||
log.error("## 解析 JSON 字符串异常", e);
|
||||
}
|
||||
|
||||
if (CollUtil.isNotEmpty(countList)) {
|
||||
// 更新评论点赞数
|
||||
countList.forEach(item -> {
|
||||
Long commentId = item.getCommentId();
|
||||
Integer count = item.getCount();
|
||||
|
||||
commentDOMapper.updateLikeTotalByCommentId(count, commentId);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,138 @@
|
||||
package com.hanserwei.hannote.count.biz.consumer;
|
||||
|
||||
import com.github.phantomthief.collection.BufferTrigger;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants;
|
||||
import com.hanserwei.hannote.count.biz.enums.LikeUnlikeCommentTypeEnum;
|
||||
import com.hanserwei.hannote.count.biz.model.dto.AggregationCountLikeUnlikeCommentMqDTO;
|
||||
import com.hanserwei.hannote.count.biz.model.dto.CountLikeUnlikeCommentMqDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
@RocketMQMessageListener(consumerGroup = "han_note_group_count_" + MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE, // Group 组
|
||||
topic = MQConstants.TOPIC_COMMENT_LIKE_OR_UNLIKE // 主题 Topic
|
||||
)
|
||||
@Slf4j
|
||||
public class CountCommentLikeConsumer implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
|
||||
.bufferSize(50000) // 缓存队列的最大容量
|
||||
.batchSize(1000) // 一批次最多聚合 1000 条
|
||||
.linger(Duration.ofSeconds(1)) // 多久聚合一次
|
||||
.setConsumerEx(this::consumeMessage) // 设置消费者方法
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
// 往 bufferTrigger 中添加元素
|
||||
bufferTrigger.enqueue(body);
|
||||
}
|
||||
|
||||
private void consumeMessage(List<String> bodys) {
|
||||
log.info("==> 【评论点赞数】聚合消息, size: {}", bodys.size());
|
||||
log.info("==> 【评论点赞数】聚合消息, {}", JsonUtils.toJsonString(bodys));
|
||||
|
||||
// List<String> 转 List<CountLikeUnlikeCommentMqDTO>
|
||||
List<CountLikeUnlikeCommentMqDTO> countLikeUnlikeCommentMqDTOS = bodys.stream()
|
||||
.map(body -> JsonUtils.parseObject(body, CountLikeUnlikeCommentMqDTO.class)).toList();
|
||||
|
||||
// 按评论 ID 进行分组
|
||||
Map<Long, List<CountLikeUnlikeCommentMqDTO>> groupMap = countLikeUnlikeCommentMqDTOS.stream()
|
||||
.collect(Collectors.groupingBy(CountLikeUnlikeCommentMqDTO::getCommentId));
|
||||
|
||||
// 按组汇总数据,统计出最终的计数
|
||||
// 最终操作的计数对象
|
||||
List<AggregationCountLikeUnlikeCommentMqDTO> countList = Lists.newArrayList();
|
||||
|
||||
for (Map.Entry<Long, List<CountLikeUnlikeCommentMqDTO>> entry : groupMap.entrySet()) {
|
||||
// 评论 ID
|
||||
Long commentId = entry.getKey();
|
||||
|
||||
List<CountLikeUnlikeCommentMqDTO> list = entry.getValue();
|
||||
// 最终的计数值,默认为 0
|
||||
int finalCount = 0;
|
||||
for (CountLikeUnlikeCommentMqDTO countLikeUnlikeCommentMqDTO : list) {
|
||||
// 获取操作类型
|
||||
Integer type = countLikeUnlikeCommentMqDTO.getType();
|
||||
|
||||
// 根据操作类型,获取对应枚举
|
||||
LikeUnlikeCommentTypeEnum likeUnlikeCommentTypeEnum = LikeUnlikeCommentTypeEnum.valueOf(type);
|
||||
|
||||
// 若枚举为空,跳到下一次循环
|
||||
if (Objects.isNull(likeUnlikeCommentTypeEnum)) continue;
|
||||
|
||||
switch (likeUnlikeCommentTypeEnum) {
|
||||
case LIKE -> finalCount += 1; // 如果为点赞操作,点赞数 +1
|
||||
case UNLIKE -> finalCount -= 1; // 如果为取消点赞操作,点赞数 -1
|
||||
}
|
||||
}
|
||||
// 将分组后统计出的最终计数,存入 countList 中
|
||||
countList.add(AggregationCountLikeUnlikeCommentMqDTO.builder()
|
||||
.commentId(commentId)
|
||||
.count(finalCount)
|
||||
.build());
|
||||
}
|
||||
|
||||
log.info("## 【评论点赞数】聚合后的计数数据: {}", JsonUtils.toJsonString(countList));
|
||||
|
||||
// 更新 Redis
|
||||
countList.forEach(item -> {
|
||||
// 评论 ID
|
||||
Long commentId = item.getCommentId();
|
||||
// 聚合后的计数
|
||||
Integer count = item.getCount();
|
||||
|
||||
// Redis 中评论计数 Hash Key
|
||||
String countCommentRedisKey = RedisKeyConstants.buildCountCommentKey(commentId);
|
||||
// 判断 Redis 中 Hash 是否存在
|
||||
boolean isCountCommentExisted = redisTemplate.hasKey(countCommentRedisKey);
|
||||
|
||||
// 若存在才会更新
|
||||
// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
|
||||
if (isCountCommentExisted) {
|
||||
// 对目标用户 Hash 中的点赞数字段进行计数操作
|
||||
redisTemplate.opsForHash().increment(countCommentRedisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, count);
|
||||
}
|
||||
});
|
||||
|
||||
// 发送 MQ, 评论点赞数据落库
|
||||
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countList))
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_COMMENT_LIKE_2_DB, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【计数服务:评论点赞数写库】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【计数服务:评论点赞数写库】MQ 发送异常: ", throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -16,4 +16,14 @@ public interface CommentDOMapper extends BaseMapper<CommentDO> {
|
||||
* @return 更新结果
|
||||
*/
|
||||
int updateChildCommentTotal(@Param("parentId") Long parentId, @Param("count") int count);
|
||||
|
||||
/**
|
||||
* 更新评论点赞数
|
||||
*
|
||||
* @param count 计数
|
||||
* @param commentId 评论 ID
|
||||
* @return 更新结果
|
||||
*/
|
||||
int updateLikeTotalByCommentId(@Param("count") Integer count,
|
||||
@Param("commentId") Long commentId);
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.hanserwei.hannote.count.biz.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum LikeUnlikeCommentTypeEnum {
|
||||
// 点赞
|
||||
LIKE(1),
|
||||
// 取消点赞
|
||||
UNLIKE(0),
|
||||
;
|
||||
|
||||
private final Integer code;
|
||||
|
||||
public static LikeUnlikeCommentTypeEnum valueOf(Integer code) {
|
||||
for (LikeUnlikeCommentTypeEnum likeUnlikeCommentTypeEnum : LikeUnlikeCommentTypeEnum.values()) {
|
||||
if (Objects.equals(code, likeUnlikeCommentTypeEnum.getCode())) {
|
||||
return likeUnlikeCommentTypeEnum;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.hanserwei.hannote.count.biz.model.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class AggregationCountLikeUnlikeCommentMqDTO {
|
||||
|
||||
/**
|
||||
* 评论 ID
|
||||
*/
|
||||
private Long commentId;
|
||||
|
||||
/**
|
||||
* 聚合后的计数
|
||||
*/
|
||||
private Integer count;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.hanserwei.hannote.count.biz.model.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class CountLikeUnlikeCommentMqDTO {
|
||||
|
||||
private Long userId;
|
||||
|
||||
private Long commentId;
|
||||
|
||||
/**
|
||||
* 0: 取消点赞, 1:点赞
|
||||
*/
|
||||
private Integer type;
|
||||
}
|
||||
@@ -48,4 +48,11 @@
|
||||
where id = #{parentId}
|
||||
and level = 1
|
||||
</update>
|
||||
|
||||
<update id="updateLikeTotalByCommentId" parameterType="map">
|
||||
update t_comment
|
||||
set like_total = like_total + #{count},
|
||||
update_time = now()
|
||||
where id = #{commentId}
|
||||
</update>
|
||||
</mapper>
|
||||
@@ -0,0 +1,78 @@
|
||||
package com.hanserwei.hannote.count.biz;
|
||||
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@SpringBootTest
|
||||
public class TestCommentLikeUnLikeConsumer {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 测试:模拟发送评论点赞、取消点赞消息
|
||||
*/
|
||||
@Test
|
||||
void testBatchSendLikeUnlikeCommentMQ() {
|
||||
Long userId = 2001L;
|
||||
Long commentId = 4001L;
|
||||
|
||||
for (long i = 0; i < 32; i++) {
|
||||
// 构建消息体 DTO
|
||||
LikeUnlikeCommentMqDTO likeUnlikeCommentMqDTO = LikeUnlikeCommentMqDTO.builder()
|
||||
.userId(userId)
|
||||
.commentId(commentId)
|
||||
.createTime(LocalDateTime.now())
|
||||
.build();
|
||||
|
||||
// 通过冒号连接, 可让 MQ 发送给主题 Topic 时,携带上标签 Tag
|
||||
String destination = "CommentLikeUnlikeTopic:";
|
||||
|
||||
if (i % 2 == 0) { // 偶数
|
||||
likeUnlikeCommentMqDTO.setType(0); // 取消点赞
|
||||
destination = destination + "Unlike";
|
||||
} else { // 奇数
|
||||
likeUnlikeCommentMqDTO.setType(1); // 点赞
|
||||
destination = destination + "Like";
|
||||
}
|
||||
|
||||
// MQ 分区键
|
||||
String hashKey = String.valueOf(userId);
|
||||
|
||||
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
|
||||
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(likeUnlikeCommentMqDTO))
|
||||
.build();
|
||||
|
||||
// 同步发送 MQ 消息
|
||||
rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Builder
|
||||
static class LikeUnlikeCommentMqDTO {
|
||||
|
||||
private Long userId;
|
||||
|
||||
private Long commentId;
|
||||
|
||||
/**
|
||||
* 0: 取消点赞, 1:点赞
|
||||
*/
|
||||
private Integer type;
|
||||
|
||||
private LocalDateTime createTime;
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user