feat(data-align): 新增笔记收藏数据对齐功能
- 新增布隆过滤器Lua脚本,用于日增量笔记收藏数据去重 - 新增收藏/取消收藏MQ消息DTO定义- 新增笔记收藏数和用户获得收藏数的数据库插入接口及XML配置 - 新增笔记收藏数MQ Topic常量定义 - 优化笔记服务中布隆过滤器异步初始化逻辑-修复取消收藏时Lua脚本路径错误问题 - 增强取消收藏操作的幂等性校验 - 新增笔记收藏数据对齐消费者,实现增量数据落库和布隆过滤器更新 - 新增笔记收藏布隆过滤器相关常量和工具方法
This commit is contained in:
@@ -7,4 +7,9 @@ public interface MQConstants {
|
|||||||
*/
|
*/
|
||||||
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 计数 - 笔记收藏数
|
||||||
|
*/
|
||||||
|
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -7,6 +7,11 @@ public class RedisKeyConstants {
|
|||||||
*/
|
*/
|
||||||
public static final String BLOOM_TODAY_NOTE_LIKE_LIST_KEY = "bloom:dataAlign:note:likes:";
|
public static final String BLOOM_TODAY_NOTE_LIKE_LIST_KEY = "bloom:dataAlign:note:likes:";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 前缀
|
||||||
|
*/
|
||||||
|
public static final String BLOOM_TODAY_NOTE_COLLECT_LIST_KEY = "bloom:dataAlign:note:collects:";
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
||||||
@@ -18,4 +23,14 @@ public class RedisKeyConstants {
|
|||||||
return BLOOM_TODAY_NOTE_LIKE_LIST_KEY + date;
|
return BLOOM_TODAY_NOTE_LIKE_LIST_KEY + date;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||||
|
*
|
||||||
|
* @param date 日期
|
||||||
|
* @return 完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||||
|
*/
|
||||||
|
public static String buildBloomUserNoteCollectListKey(String date) {
|
||||||
|
return BLOOM_TODAY_NOTE_COLLECT_LIST_KEY + date;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.consumer;
|
||||||
|
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
||||||
|
import com.hanserwei.hannote.data.align.model.vo.CollectUnCollectNoteMqDTO;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||||
|
import org.springframework.data.redis.core.script.RedisScript;
|
||||||
|
import org.springframework.scripting.support.ResourceScriptSource;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
consumerGroup = "han_note_group_data_align_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT,
|
||||||
|
topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT
|
||||||
|
)
|
||||||
|
public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
@Resource
|
||||||
|
private TransactionTemplate transactionTemplate;
|
||||||
|
@Resource
|
||||||
|
private InsertRecordMapper insertRecordMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表总分片数
|
||||||
|
*/
|
||||||
|
@Value("${table.shards}")
|
||||||
|
private int tableShards;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String body) {
|
||||||
|
log.info("## TodayNoteCollectIncrementData2DBConsumer 消费到了 MQ: {}", body);
|
||||||
|
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||||
|
// 消息体转DTO
|
||||||
|
CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(body, CollectUnCollectNoteMqDTO.class);
|
||||||
|
if (Objects.isNull(collectUnCollectNoteMqDTO)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("## TodayNoteCollectIncrementData2DBConsumer 笔记收藏数据:{}", JsonUtils.toJsonString(collectUnCollectNoteMqDTO));
|
||||||
|
|
||||||
|
// 被收藏的笔记ID
|
||||||
|
Long noteId = collectUnCollectNoteMqDTO.getNoteId();
|
||||||
|
// 笔记作者ID
|
||||||
|
Long noteCreatorId = collectUnCollectNoteMqDTO.getNoteCreatorId();
|
||||||
|
// 今日日期
|
||||||
|
String date = LocalDate.now()
|
||||||
|
.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||||
|
|
||||||
|
String bloomKey = RedisKeyConstants.buildBloomUserNoteCollectListKey(date);
|
||||||
|
|
||||||
|
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||||
|
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||||
|
// Lua 脚本路径
|
||||||
|
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_today_note_collect_check.lua")));
|
||||||
|
// 返回值类型
|
||||||
|
script.setResultType(Long.class);
|
||||||
|
|
||||||
|
// 执行 Lua 脚本,拿到返回结果
|
||||||
|
Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId);
|
||||||
|
log.info("布隆过滤器判断结果:{}", result);
|
||||||
|
|
||||||
|
// 若布隆过滤器判断不存在(绝对正确)
|
||||||
|
if (Objects.equals(result, 0L)) {
|
||||||
|
// 2. 若无,才会落库,减轻数据库压力
|
||||||
|
// 根据分片总数,取模,分别获取对应的分片序号
|
||||||
|
long userIdHashKey = noteCreatorId % tableShards;
|
||||||
|
long noteIdHashKey = noteId % tableShards;
|
||||||
|
log.info("根据分片总数,取模,分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey);
|
||||||
|
|
||||||
|
// 编程式事务,保证多语句的原子性
|
||||||
|
transactionTemplate.execute(status -> {
|
||||||
|
try {
|
||||||
|
// 将日增量变更数据,分别写入两张表
|
||||||
|
// - t_data_align_note_collect_count_temp_日期_分片序号
|
||||||
|
// - t_data_align_user_collect_count_temp_日期_分片序号
|
||||||
|
insertRecordMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||||
|
insertRecordMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
status.setRollbackOnly(); // 标记事务为回滚
|
||||||
|
log.error("", ex);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3. 数据库写入成功后,再添加布隆过滤器中
|
||||||
|
// 4. 数据库写入成功后,再添加布隆过滤器中
|
||||||
|
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||||
|
redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,4 +17,13 @@ public interface InsertRecordMapper {
|
|||||||
*/
|
*/
|
||||||
void insert2DataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId);
|
void insert2DataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记收藏数:计数变更
|
||||||
|
*/
|
||||||
|
void insert2DataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("noteId") Long noteId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户获得的收藏数:计数变更
|
||||||
|
*/
|
||||||
|
void insert2DataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId);
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.model.vo;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class CollectUnCollectNoteMqDTO {
|
||||||
|
|
||||||
|
private Long userId;
|
||||||
|
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 0: 取消收藏, 1:收藏
|
||||||
|
*/
|
||||||
|
private Integer type;
|
||||||
|
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记发布者 ID
|
||||||
|
*/
|
||||||
|
private Long noteCreatorId;
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
-- LUA 脚本:日增量笔记收藏、取消收藏变更数据布隆过滤器
|
||||||
|
|
||||||
|
local key = KEYS[1] -- 操作的 Redis Key
|
||||||
|
local noteIdAndNoteCreatorId = ARGV[1] -- Redis Value
|
||||||
|
|
||||||
|
-- 使用 EXISTS 命令检查布隆过滤器是否存在
|
||||||
|
local exists = redis.call('EXISTS', key)
|
||||||
|
if exists == 0 then
|
||||||
|
-- 创建布隆过滤器
|
||||||
|
redis.call('BF.ADD', key, '')
|
||||||
|
-- 设置过期时间,一天后过期
|
||||||
|
redis.call("EXPIRE", key, 20 * 60 * 60)
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 校验该变更数据是否已经存在(1 表示已存在,0 表示不存在)
|
||||||
|
return redis.call('BF.EXISTS', key, noteIdAndNoteCreatorId)
|
||||||
@@ -10,4 +10,14 @@
|
|||||||
insert into `t_data_align_user_like_count_temp_${tableNameSuffix}` (user_id)
|
insert into `t_data_align_user_like_count_temp_${tableNameSuffix}` (user_id)
|
||||||
values (#{userId})
|
values (#{userId})
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insert2DataAlignNoteCollectCountTempTable" parameterType="map">
|
||||||
|
insert into `t_data_align_note_collect_count_temp_${tableNameSuffix}` (note_id)
|
||||||
|
values (#{noteId})
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insert2DataAlignUserCollectCountTempTable" parameterType="map">
|
||||||
|
insert into `t_data_align_user_collect_count_temp_${tableNameSuffix}` (user_id)
|
||||||
|
values (#{userId})
|
||||||
|
</insert>
|
||||||
</mapper>
|
</mapper>
|
||||||
@@ -465,7 +465,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
String contentUuid = noteDO1.getContentUuid();
|
String contentUuid = noteDO1.getContentUuid();
|
||||||
|
|
||||||
// 笔记内容是否更新成功
|
// 笔记内容是否更新成功
|
||||||
boolean isUpdateContentSuccess = false;
|
boolean isUpdateContentSuccess;
|
||||||
if (StringUtils.isBlank(content)) {
|
if (StringUtils.isBlank(content)) {
|
||||||
// 若笔记内容为空,则删除 K-V 存储
|
// 若笔记内容为空,则删除 K-V 存储
|
||||||
isUpdateContentSuccess = keyValueRpcService.deleteNoteContent(contentUuid);
|
isUpdateContentSuccess = keyValueRpcService.deleteNoteContent(contentUuid);
|
||||||
@@ -904,9 +904,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
// 若目标笔记已经收藏
|
// 若目标笔记已经收藏
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
// 异步初始化布隆过滤器
|
// 异步初始化布隆过滤器
|
||||||
threadPoolTaskExecutor.submit(() -> {
|
threadPoolTaskExecutor.submit(() -> batchAddNoteCollect2BloomAndExpire(userId, expireSeconds, bloomUserNoteCollectListKey));
|
||||||
batchAddNoteCollect2BloomAndExpire(userId, expireSeconds, bloomUserNoteCollectListKey);
|
|
||||||
});
|
|
||||||
throw new ApiException(ResponseCodeEnum.NOTE_ALREADY_COLLECTED);
|
throw new ApiException(ResponseCodeEnum.NOTE_ALREADY_COLLECTED);
|
||||||
}
|
}
|
||||||
// 若目标笔记未被收藏,查询当前用户是否有收藏其他笔记,有则同步初始化布隆过滤器
|
// 若目标笔记未被收藏,查询当前用户是否有收藏其他笔记,有则同步初始化布隆过滤器
|
||||||
@@ -1037,7 +1035,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
|
|
||||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||||
// Lua 脚本路径
|
// Lua 脚本路径
|
||||||
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_note_uncollect_check.lua")));
|
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_note_uncollected_check.lua")));
|
||||||
// 返回值类型
|
// 返回值类型
|
||||||
script.setResultType(Long.class);
|
script.setResultType(Long.class);
|
||||||
|
|
||||||
@@ -1073,7 +1071,10 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
// 用户收藏列表 ZSet Key
|
// 用户收藏列表 ZSet Key
|
||||||
String userNoteCollectZSetKey = RedisKeyConstants.buildUserNoteCollectZSetKey(userId);
|
String userNoteCollectZSetKey = RedisKeyConstants.buildUserNoteCollectZSetKey(userId);
|
||||||
|
|
||||||
redisTemplate.opsForZSet().remove(userNoteCollectZSetKey, noteId);
|
Long removed = redisTemplate.opsForZSet().remove(userNoteCollectZSetKey, noteId);
|
||||||
|
if (removed != null && removed == 0) {
|
||||||
|
throw new ApiException(ResponseCodeEnum.NOTE_NOT_COLLECTED);
|
||||||
|
}
|
||||||
|
|
||||||
// 4. 发送 MQ, 数据更新落库
|
// 4. 发送 MQ, 数据更新落库
|
||||||
// 构建消息体 DTO
|
// 构建消息体 DTO
|
||||||
|
|||||||
Reference in New Issue
Block a user