feat(count): 实现笔记收藏计数功能

- 新增笔记收藏/取消收藏 MQ 消费者,处理收藏计数逻辑
- 新增笔记收藏数落库消费者,实现批量更新数据库
- 新增收藏类型枚举和 DTO 类,用于消息传递与解析
- 修改 MQ 消费组名称前缀统一为 han_note_group_
- 新增 Redis 收藏总数字段常量及更新逻辑
- 扩展 NoteCountDOMapper 支持收藏数插入或更新操作
- 在 XML 映射文件中新增对应 SQL 插入语句
- 完善 MQ 常量定义,增加收藏相关主题常量
This commit is contained in:
2025-10-19 16:06:45 +08:00
parent c036fadbff
commit 564eefa7bc
13 changed files with 306 additions and 7 deletions

View File

@@ -32,4 +32,14 @@ public interface MQConstants {
*/ */
String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic"; String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic";
/**
* Topic: 计数 - 笔记收藏数
*/
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
/**
* Topic: 计数 - 笔记收藏数落库
*/
String TOPIC_COUNT_NOTE_COLLECT_2_DB = "CountNoteCollect2DBTTopic";
} }

View File

@@ -26,6 +26,11 @@ public class RedisKeyConstants {
*/ */
private static final String COUNT_NOTE_KEY_PREFIX = "count:note:"; private static final String COUNT_NOTE_KEY_PREFIX = "count:note:";
/**
* Hash Field: 笔记收藏总数
*/
public static final String FIELD_COLLECT_TOTAL = "collectTotal";
/** /**
* 构建用户维度计数 Key * 构建用户维度计数 Key
* *

View File

@@ -15,7 +15,7 @@ import java.util.Map;
@SuppressWarnings("ALL") @SuppressWarnings("ALL")
@Component @Component
@RocketMQMessageListener(consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_FANS_2_DB, // Group 组 @RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FANS_2_DB, // Group 组
topic = MQConstants.TOPIC_COUNT_FANS_2_DB // 主题 Topic topic = MQConstants.TOPIC_COUNT_FANS_2_DB // 主题 Topic
) )
@Slf4j @Slf4j

View File

@@ -0,0 +1,49 @@
package com.hanserwei.hannote.count.biz.consumer;
import cn.hutool.core.collection.CollUtil;
import com.google.common.util.concurrent.RateLimiter;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@SuppressWarnings("UnstableApiUsage")
@Component
@Slf4j
@RocketMQMessageListener(
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT_2_DB,
topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT_2_DB
)
public class CountNoteCollect2DBConsumer implements RocketMQListener<String> {
// 每秒创建 5000 个令牌
private final RateLimiter rateLimiter = RateLimiter.create(5000);
@Resource
private NoteCountDOMapper noteCountDOMapper;
@Override
public void onMessage(String body) {
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
rateLimiter.acquire();
log.info("## 消费到了 MQ 【计数: 笔记收藏数入库】, {}...", body);
Map<Long, Integer> countMap = null;
try {
countMap = JsonUtils.parseMap(body, Long.class, Integer.class);
} catch (Exception e) {
log.error("## 解析 JSON 字符串异常", e);
}
if (CollUtil.isNotEmpty(countMap)) {
// 判断数据库中 t_note_count 表,若笔记计数记录不存在,则插入;若记录已存在,则直接更新
countMap.forEach((k, v) -> noteCountDOMapper.insertOrUpdateCollectTotalByNoteId(v, k));
}
}
}

View File

@@ -0,0 +1,118 @@
package com.hanserwei.hannote.count.biz.consumer;
import com.github.phantomthief.collection.BufferTrigger;
import com.google.common.collect.Maps;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants;
import com.hanserwei.hannote.count.biz.enums.CollectUnCollectNoteTypeEnum;
import com.hanserwei.hannote.count.biz.model.dto.CountCollectUnCollectNoteMqDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@Component
@Slf4j
@RocketMQMessageListener(
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT,
topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT
)
public class CountNoteCollectConsumer implements RocketMQListener<String> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private RocketMQTemplate rocketMQTemplate;
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.bufferSize(50000) // 缓存队列的最大容量
.batchSize(1000) // 一批次最多聚合 1000 条
.linger(Duration.ofSeconds(1)) // 多久聚合一次
.setConsumerEx(this::consumeMessage) // 设置消费者方法
.build();
@Override
public void onMessage(String body) {
// 往 bufferTrigger 中添加元素
bufferTrigger.enqueue(body);
}
private void consumeMessage(List<String> bodies) {
log.info("==> 【笔记收藏数】聚合消息, size: {}", bodies.size());
log.info("==> 【笔记收藏数】聚合消息, {}", JsonUtils.toJsonString(bodies));
// List<String> -> List<CountCollectUnCollectNoteMqDTO>
List<CountCollectUnCollectNoteMqDTO> countCollectUnCollectNoteMqDTOS = bodies.stream()
.map(body -> JsonUtils.parseObject(body, CountCollectUnCollectNoteMqDTO.class))
.toList();
// 按笔记ID分组
Map<Long, List<CountCollectUnCollectNoteMqDTO>> groupMap = countCollectUnCollectNoteMqDTOS.stream()
.collect(Collectors.groupingBy(CountCollectUnCollectNoteMqDTO::getNoteId));
// 按组汇总数据,统计出最终的计数
// key 为笔记 ID, value 为最终操作的计数
Map<Long, Integer> countMap = Maps.newHashMap();
for (Map.Entry<Long, List<CountCollectUnCollectNoteMqDTO>> entry : groupMap.entrySet()) {
List<CountCollectUnCollectNoteMqDTO> list = entry.getValue();
// 默认计数为0
int finalCount = 0;
for (CountCollectUnCollectNoteMqDTO countCollectUnCollectNoteMqDTO : list) {
Integer type = countCollectUnCollectNoteMqDTO.getType();
// 获取枚举类
CollectUnCollectNoteTypeEnum collectUnCollectNoteTypeEnum = CollectUnCollectNoteTypeEnum.valueOf(type);
switch (Objects.requireNonNull(collectUnCollectNoteTypeEnum)) {
case COLLECT -> finalCount++;
case UN_COLLECT -> finalCount--;
}
}
// 将分组后统计出的最终计数,存入 countMap 中
countMap.put(entry.getKey(), finalCount);
}
log.info("==> 【笔记收藏数】最终结果, {}", JsonUtils.toJsonString(countMap));
// 更新 Redis
countMap.forEach((k, v) -> {
// Redis Hash Key
String redisKey = RedisKeyConstants.buildCountNoteKey(k);
// 判断 Redis 中 Hash 是否存在
boolean isExisted = redisTemplate.hasKey(redisKey);
// 若存在才会更新
// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
if (isExisted) {
// 对目标用户 Hash 中的收藏总数字段进行计数操作
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_COLLECT_TOTAL, v);
}
});
// 发送 MQ, 笔记收藏数据落库
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
.build();
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT_2_DB, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数服务笔记收藏数入库】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数服务笔记收藏数入库】MQ 发送异常: ", throwable);
}
});
}
}

View File

@@ -17,7 +17,7 @@ import java.util.Map;
@Slf4j @Slf4j
@SuppressWarnings({"UnstableApiUsage"}) @SuppressWarnings({"UnstableApiUsage"})
@RocketMQMessageListener( @RocketMQMessageListener(
consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB, consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB,
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB topic = MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB
) )
public class CountNoteLike2DBConsumer implements RocketMQListener<String> { public class CountNoteLike2DBConsumer implements RocketMQListener<String> {

View File

@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
@Component @Component
@Slf4j @Slf4j
@RocketMQMessageListener( @RocketMQMessageListener(
consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE, consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE,
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE topic = MQConstants.TOPIC_COUNT_NOTE_LIKE
) )
public class CountNoteLikeConsumer implements RocketMQListener<String> { public class CountNoteLikeConsumer implements RocketMQListener<String> {

View File

@@ -16,4 +16,13 @@ public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
* @return 影响行数 * @return 影响行数
*/ */
int insertOrUpdateLikeTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId); int insertOrUpdateLikeTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId);
/**
* 添加记录或更新笔记收藏数
*
* @param count 收藏数
* @param noteId 笔记ID
* @return 影响行数
*/
int insertOrUpdateCollectTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId);
} }

View File

@@ -0,0 +1,28 @@
package com.hanserwei.hannote.count.biz.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
@Getter
@AllArgsConstructor
public enum CollectUnCollectNoteTypeEnum {
// 收藏
COLLECT(1),
// 取消收藏
UN_COLLECT(0),
;
private final Integer code;
public static CollectUnCollectNoteTypeEnum valueOf(Integer code) {
for (CollectUnCollectNoteTypeEnum collectUnCollectNoteTypeEnum : CollectUnCollectNoteTypeEnum.values()) {
if (Objects.equals(code, collectUnCollectNoteTypeEnum.getCode())) {
return collectUnCollectNoteTypeEnum;
}
}
return null;
}
}

View File

@@ -0,0 +1,26 @@
package com.hanserwei.hannote.count.biz.model.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class CountCollectUnCollectNoteMqDTO {
private Long userId;
private Long noteId;
/**
* 0: 取消收藏, 1收藏
*/
private Integer type;
private LocalDateTime createTime;
}

View File

@@ -20,4 +20,10 @@
VALUES (#{noteId}, #{count}) VALUES (#{noteId}, #{count})
ON DUPLICATE KEY UPDATE like_total = like_total + (#{count}); ON DUPLICATE KEY UPDATE like_total = like_total + (#{count});
</insert> </insert>
<insert id="insertOrUpdateCollectTotalByNoteId" parameterType="map">
INSERT INTO t_note_count (note_id, collect_total)
VALUES (#{noteId}, #{count})
ON DUPLICATE KEY UPDATE collect_total = collect_total + (#{count});
</insert>
</mapper> </mapper>

View File

@@ -8,10 +8,14 @@ import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper;
import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO; import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@@ -21,7 +25,7 @@ import java.util.Objects;
@Component @Component
@Slf4j @Slf4j
@RocketMQMessageListener( @RocketMQMessageListener(
consumerGroup = "han_note_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
consumeMode = ConsumeMode.ORDERLY consumeMode = ConsumeMode.ORDERLY
) )
@@ -31,6 +35,8 @@ public class CollectUnCollectNoteConsumer implements RocketMQListener<Message> {
private final RateLimiter rateLimiter = RateLimiter.create(5000); private final RateLimiter rateLimiter = RateLimiter.create(5000);
@Resource @Resource
private NoteCollectionDOMapper noteCollectionDOMapper; private NoteCollectionDOMapper noteCollectionDOMapper;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override @Override
public void onMessage(Message message) { public void onMessage(Message message) {
@@ -85,7 +91,27 @@ public class CollectUnCollectNoteConsumer implements RocketMQListener<Message> {
// 取消收藏:记录更新 // 取消收藏:记录更新
int count = noteCollectionDOMapper.update2UnCollectByUserIdAndNoteId(noteCollectionDO); int count = noteCollectionDOMapper.update2UnCollectByUserIdAndNoteId(noteCollectionDO);
// TODO: 发送计数 MQ if (count == 0) {
return;
}
// 更新数据库成功后,发送计数 MQ
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
.build();
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数: 笔记取消收藏】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数: 笔记取消收藏】MQ 发送异常: ", throwable);
}
});
} }
/** /**
@@ -119,6 +145,28 @@ public class CollectUnCollectNoteConsumer implements RocketMQListener<Message> {
// 添加或更新笔记收藏记录 // 添加或更新笔记收藏记录
boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO); boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO);
// TODO: 发送计数 MQ if (!isSuccess) {
return;
}
// 发送计数 MQ
// 更新数据库成功后,发送计数 MQ
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
.build();
// 异步发送 MQ 消息
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数: 笔记收藏】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数: 笔记收藏】MQ 发送异常: ", throwable);
}
});
} }
} }

View File

@@ -27,7 +27,7 @@ import java.util.Objects;
@SuppressWarnings({"UnstableApiUsage"}) @SuppressWarnings({"UnstableApiUsage"})
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
consumerGroup = "han_note_" + MQConstants.TOPIC_LIKE_OR_UNLIKE, consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE,
topic = MQConstants.TOPIC_LIKE_OR_UNLIKE, topic = MQConstants.TOPIC_LIKE_OR_UNLIKE,
consumeMode = ConsumeMode.ORDERLY// 顺序消费 consumeMode = ConsumeMode.ORDERLY// 顺序消费
) )