feat(count): 实现笔记点赞计数功能

- 新增笔记点赞计数 MQ DTO 类
- 实现笔记点赞计数消费者,支持流量削峰与批量聚合
- 实现笔记点赞计数落库消费者,带限流处理
- 新增笔记点赞类型枚举类
- 添加笔记点赞相关 MQ Topic 常量定义
- 扩展笔记计数 Mapper,支持点赞数更新
- 新增 Redis 笔记计数 Key 构建方法及字段常量
- 在笔记服务中发送点赞计数 MQ 消息
This commit is contained in:
2025-10-18 16:16:18 +08:00
parent cfcd12be0d
commit 54c34706fb
10 changed files with 316 additions and 2 deletions

View File

@@ -22,4 +22,14 @@ public interface MQConstants {
*/
String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic";
/**
* Topic: 计数 - 笔记点赞数
*/
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
/**
* Topic: 计数 - 笔记点赞数落库
*/
String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic";
}

View File

@@ -6,15 +6,26 @@ public class RedisKeyConstants {
* Hash Field: 粉丝总数
*/
public static final String FIELD_FANS_TOTAL = "fansTotal";
/**
* Hash Field: 关注总数
*/
public static final String FIELD_FOLLOWING_TOTAL = "followingTotal";
/**
* 用户维度计数 Key 前缀
*/
private static final String COUNT_USER_KEY_PREFIX = "count:user:";
/**
* Hash Field: 笔记点赞总数
*/
public static final String FIELD_LIKE_TOTAL = "likeTotal";
/**
* 笔记维度计数 Key 前缀
*/
private static final String COUNT_NOTE_KEY_PREFIX = "count:note:";
/**
* 构建用户维度计数 Key
*
@@ -25,5 +36,15 @@ public class RedisKeyConstants {
return COUNT_USER_KEY_PREFIX + userId;
}
/**
* 构建笔记维度计数 Key
*
* @param noteId 笔记ID
* @return 笔记维度计数 Key
*/
public static String buildCountNoteKey(Long noteId) {
return COUNT_NOTE_KEY_PREFIX + noteId;
}
}

View File

@@ -0,0 +1,49 @@
package com.hanserwei.hannote.count.biz.consumer;
import cn.hutool.core.collection.CollUtil;
import com.google.common.util.concurrent.RateLimiter;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
@SuppressWarnings({"UnstableApiUsage"})
@RocketMQMessageListener(
consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB,
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB
)
public class CountNoteLike2DBConsumer implements RocketMQListener<String> {
// 每秒创建 5000 个令牌
private final RateLimiter rateLimiter = RateLimiter.create(5000);
@Resource
private NoteCountDOMapper noteCountDOMapper;
@Override
public void onMessage(String body) {
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
rateLimiter.acquire();
log.info("## 消费到了 MQ 【计数: 笔记点赞数入库】, {}...", body);
Map<Long, Integer> countMap = null;
try {
countMap = JsonUtils.parseMap(body, Long.class, Integer.class);
} catch (Exception e) {
log.error("## 解析 JSON 字符串异常", e);
}
if (CollUtil.isNotEmpty(countMap)) {
// 判断数据库中 t_note_count 表,若笔记计数记录不存在,则插入;若记录已存在,则直接更新
countMap.forEach((k, v) -> noteCountDOMapper.insertOrUpdateLikeTotalByNoteId(v, k));
}
}
}

View File

@@ -0,0 +1,115 @@
package com.hanserwei.hannote.count.biz.consumer;
import com.github.phantomthief.collection.BufferTrigger;
import com.google.common.collect.Maps;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants;
import com.hanserwei.hannote.count.biz.enums.LikeUnlikeNoteTypeEnum;
import com.hanserwei.hannote.count.biz.model.dto.CountLikeUnlikeNoteMqDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component
@Slf4j
@RocketMQMessageListener(
consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE,
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE
)
public class CountNoteLikeConsumer implements RocketMQListener<String> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private RocketMQTemplate rocketMQTemplate;
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.bufferSize(50000) // 缓存队列的最大容量
.batchSize(1000) // 一批次最多聚合 1000 条
.linger(Duration.ofSeconds(1)) // 多久聚合一次
.setConsumerEx(this::consumeMessage) // 设置消费者方法
.build();
@Override
public void onMessage(String body) {
// 往 bufferTrigger 中添加元素
bufferTrigger.enqueue(body);
}
private void consumeMessage(List<String> bodies) {
log.info("==> 【笔记点赞数】聚合消息, size: {}", bodies.size());
log.info("==> 【笔记点赞数】聚合消息, {}", JsonUtils.toJsonString(bodies));
List<CountLikeUnlikeNoteMqDTO> countLikeUnlikeNoteMqDTOS = bodies.stream()
.map(body -> JsonUtils.parseObject(body, CountLikeUnlikeNoteMqDTO.class)).toList();
// 按笔记ID分组
Map<Long, List<CountLikeUnlikeNoteMqDTO>> groupMap = countLikeUnlikeNoteMqDTOS.stream()
.collect(Collectors.groupingBy(CountLikeUnlikeNoteMqDTO::getNoteId));
// 按组汇总统计处最终计数
// key为笔记IDvalue为最终操作计数
Map<Long, Integer> countMap = Maps.newHashMap();
for (Map.Entry<Long, List<CountLikeUnlikeNoteMqDTO>> entry : groupMap.entrySet()) {
List<CountLikeUnlikeNoteMqDTO> list = entry.getValue();
// 最终计数默认为0
int finalCount = 0;
for (CountLikeUnlikeNoteMqDTO countLikeUnlikeNoteMqDTO : list) {
Integer type = countLikeUnlikeNoteMqDTO.getType();
LikeUnlikeNoteTypeEnum likeUnlikeNoteTypeEnum = LikeUnlikeNoteTypeEnum.valueOf(type);
if (likeUnlikeNoteTypeEnum == null) {
continue;
}
switch (likeUnlikeNoteTypeEnum) {
case LIKE -> finalCount++;
case UNLIKE -> finalCount--;
}
}
countMap.put(entry.getKey(), finalCount);
}
log.info("## 【笔记点赞数】聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));
// 更新Redis
countMap.forEach((k, v) -> {
// Redis Key
String redisKey = RedisKeyConstants.buildCountNoteKey(k);
// 判断 Redis 中 Hash 是否存在
boolean isExisted = redisTemplate.hasKey(redisKey);
// 若存在才会更新
// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
if (isExisted) {
// 对目标用户 Hash 中的点赞数字段进行计数操作
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, v);
}
});
// 发送 MQ, 笔记点赞数据落库
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
.build();
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数服务笔记点赞数入库】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数服务笔记点赞数入库】MQ 发送异常: ", throwable);
}
});
}
}

View File

@@ -3,7 +3,17 @@ package com.hanserwei.hannote.count.biz.domain.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hanserwei.hannote.count.biz.domain.dataobject.NoteCountDO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
/**
* 添加笔记计数记录或更新笔记点赞数
*
* @param count 计数
* @param noteId 笔记ID
* @return 影响行数
*/
int insertOrUpdateLikeTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId);
}

View File

@@ -0,0 +1,28 @@
package com.hanserwei.hannote.count.biz.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
@Getter
@AllArgsConstructor
public enum LikeUnlikeNoteTypeEnum {
// 点赞
LIKE(1),
// 取消点赞
UNLIKE(0),
;
private final Integer code;
public static LikeUnlikeNoteTypeEnum valueOf(Integer code) {
for (LikeUnlikeNoteTypeEnum likeUnlikeNoteTypeEnum : LikeUnlikeNoteTypeEnum.values()) {
if (Objects.equals(code, likeUnlikeNoteTypeEnum.getCode())) {
return likeUnlikeNoteTypeEnum;
}
}
return null;
}
}

View File

@@ -0,0 +1,26 @@
package com.hanserwei.hannote.count.biz.model.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class CountLikeUnlikeNoteMqDTO {
private Long userId;
private Long noteId;
/**
* 0: 取消点赞, 1点赞
*/
private Integer type;
private LocalDateTime createTime;
}

View File

@@ -14,4 +14,10 @@
<!--@mbg.generated-->
id, note_id, like_total, collect_total, comment_total
</sql>
<insert id="insertOrUpdateLikeTotalByNoteId" parameterType="map">
INSERT INTO t_note_count (note_id, like_total)
VALUES (#{noteId}, #{count})
ON DUPLICATE KEY UPDATE like_total = like_total + (#{count});
</insert>
</mapper>

View File

@@ -11,10 +11,14 @@ import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO;
import com.hanserwei.hannote.note.biz.service.NoteLikeDOService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@@ -36,6 +40,8 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener<Message> {
private NoteLikeDOMapper noteLikeDOMapper;
@Resource
private NoteLikeDOService noteLikeDOService;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void onMessage(Message message) {
@@ -94,7 +100,25 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener<Message> {
// 执行更新
boolean update = noteLikeDOService.update(updateEntity, wrapper);
// TODO: 删除计数
if (!update) {
return;
}
// 更新数据库成功后,发送计数 MQ
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
.build();
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数: 笔记取消点赞】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数: 笔记取消点赞】MQ 发送异常: ", throwable);
}
});
}
/**
@@ -128,7 +152,27 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener<Message> {
// 添加或更新笔记点赞记录
boolean count = noteLikeDOMapper.insertOrUpdate(noteLikeDO);
// TODO: 发送计数 MQ
if (!count) {
return;
}
// 发送计数 MQ
// 更新数据库成功后,发送计数 MQ
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
.build();
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数: 笔记点赞】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数: 笔记点赞】MQ 发送异常: ", throwable);
}
});
}
}

View File

@@ -17,6 +17,11 @@ public interface MQConstants {
*/
String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic";
/**
* Topic: 计数 - 笔记点赞数
*/
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
/**
* 点赞标签
*/