feat(data-align): 实现笔记点赞增量数据处理与布隆过滤器校验

- 新增 Redis 布隆过滤器 Lua 脚本,用于校验日增量变更数据- 创建 InsertRecordMapper 及 XML 映射文件,支持笔记和用户点赞数落库
- 定义 LikeUnlikeNoteMqDTO用于 MQ 消息传输
- 配置 RedisTemplate 支持 JSON 序列化
- 修改 TableConstants 中 buildTableNameSuffix 方法参数类型为 long
- 实现 TodayNoteLikeIncrementData2DBConsumer 消费者逻辑:
  - 使用布隆过滤器去重判断 - 数据库写入操作使用事务保证原子性
  - 写入成功后更新布隆过滤器
- 更新 IntelliJ IDEA 数据源映射与 SQL 检查配置
This commit is contained in:
2025-10-21 19:45:43 +08:00
parent 8a1681e590
commit c1c0590cce
10 changed files with 225 additions and 1 deletions

View File

@@ -0,0 +1,31 @@
package com.hanserwei.hannote.data.align.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisTemplateConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 设置 RedisTemplate 的连接工厂
redisTemplate.setConnectionFactory(connectionFactory);
// 使用 StringRedisSerializer 来序列化和反序列化 redis 的 key 值,确保 key 是可读的字符串
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 redis 的 value 值, 确保存储的是 JSON 格式
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
redisTemplate.setValueSerializer(serializer);
redisTemplate.setHashValueSerializer(serializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

View File

@@ -0,0 +1,21 @@
package com.hanserwei.hannote.data.align.constant;
public class RedisKeyConstants {
/**
* 布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 前缀
*/
public static final String BLOOM_TODAY_NOTE_LIKE_LIST_KEY = "bloom:dataAlign:note:likes:";
/**
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
*
* @param date 日期
* @return 完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
*/
public static String buildBloomUserNoteLikeListKey(String date) {
return BLOOM_TODAY_NOTE_LIKE_LIST_KEY + date;
}
}

View File

@@ -13,7 +13,7 @@ public class TableConstants {
* @param hashKey 哈希Keu
* @return 表名后缀
*/
public static String buildTableNameSuffix(String date, int hashKey) {
public static String buildTableNameSuffix(String date, long hashKey) {
// 拼接完整的表名
return date + TABLE_NAME_SEPARATE + hashKey;
}

View File

@@ -1,10 +1,28 @@
package com.hanserwei.hannote.data.align.consumer;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.data.align.constant.MQConstants;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
import com.hanserwei.hannote.data.align.model.vo.LikeUnlikeNoteMqDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Objects;
@Component
@Slf4j
@@ -13,8 +31,78 @@ import org.springframework.stereotype.Component;
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE
)
public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<String> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private TransactionTemplate transactionTemplate;
@Resource
private InsertRecordMapper insertRecordMapper;
/**
* 表总分片数
*/
@Value("${table.shards}")
private int tableShards;
@Override
public void onMessage(String body) {
log.info("## TodayNoteLikeIncrementData2DBConsumer 消费到了 MQ: {}", body);
// 1. 布隆过滤器判断该日增量数据是否已经记录
// Json字符串转DTO
LikeUnlikeNoteMqDTO noteLikeCountMqDTO = JsonUtils.parseObject(body, LikeUnlikeNoteMqDTO.class);
if (Objects.isNull(noteLikeCountMqDTO)) {
return;
}
log.info("## TodayNoteLikeIncrementData2DBConsumer 笔记点赞数据:{}", JsonUtils.toJsonString(noteLikeCountMqDTO));
// 获取被点赞或者取消点赞的笔记ID
Long noteId = noteLikeCountMqDTO.getNoteId();
// 获取点赞或取消点赞的笔记的创建者ID
Long noteCreatorId = noteLikeCountMqDTO.getNoteCreatorId();
// 今日日期
String date = LocalDate.now()
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
String bloomKey = RedisKeyConstants.buildBloomUserNoteLikeListKey(date);
// 1. 布隆过滤器判断该日增量数据是否已经记录
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
// Lua 脚本路径
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_today_note_like_check.lua")));
// 返回值类型
script.setResultType(Long.class);
// 执行 Lua 脚本,拿到返回结果
Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId);
log.info("布隆过滤器判断结果:{}", result);
// 若布隆过滤器判断不存在(绝对正确)
if (Objects.equals(result, 0L)) {
// 2. 若无,才会落库,减轻数据库压力
// 根据分片总数,取模,分别获取对应的分片序号
long userIdHashKey = noteCreatorId % tableShards;
long noteIdHashKey = noteId % tableShards;
log.info("根据分片总数取模分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey);
// 编程式事务,保证多语句的原子性
transactionTemplate.execute(status -> {
try {
// 将日增量变更数据,分别写入两张表
// - t_data_align_note_like_count_temp_日期_分片序号
// - t_data_align_user_like_count_temp_日期_分片序号
insertRecordMapper.insert2DataAlignNoteLikeCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
insertRecordMapper.insert2DataAlignUserLikeCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
return true;
} catch (Exception ex) {
status.setRollbackOnly();
log.error("## TodayNoteLikeIncrementData2DBConsumer 落库失败,回滚事务", ex);
}
return false;
});
// 3. 数据库写入成功后,再添加布隆过滤器中
// 4. 数据库写入成功后,再添加布隆过滤器中
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId);
}
}
}

View File

@@ -0,0 +1,20 @@
package com.hanserwei.hannote.data.align.domain.mapper;
import org.apache.ibatis.annotations.Param;
/**
* 添加记录
*/
public interface InsertRecordMapper {
/**
* 笔记点赞数:计数变更
*/
void insert2DataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("noteId") Long noteId);
/**
* 用户获得的点赞数:计数变更
*/
void insert2DataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId);
}

View File

@@ -0,0 +1,31 @@
package com.hanserwei.hannote.data.align.model.vo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class LikeUnlikeNoteMqDTO {
private Long userId;
private Long noteId;
/**
* 0: 取消点赞, 1点赞
*/
private Integer type;
/**
* 笔记发布者 ID
*/
private Long noteCreatorId;
private LocalDateTime createTime;
}

View File

@@ -0,0 +1,16 @@
-- LUA 脚本:自增量笔记点赞、取消点赞变更数据布隆过滤器
local key = KEYS[1] -- 操作的 Redis Key
local noteIdAndNoteCreatorId = ARGV[1] -- Redis Value
-- 使用 EXISTS 命令检查布隆过滤器是否存在
local exists = redis.call('EXISTS', key)
if exists == 0 then
-- 创建布隆过滤器
redis.call('BF.ADD', key, '')
-- 设置过期时间,一天后过期
redis.call("EXPIRE", key, 20 * 60 * 60)
end
-- 校验该变更数据是否已经存在(1 表示已存在0 表示不存在)
return redis.call('BF.EXISTS', key, noteIdAndNoteCreatorId)

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper">
<insert id="insert2DataAlignNoteLikeCountTempTable" parameterType="map">
insert into `t_data_align_note_like_count_temp_${tableNameSuffix}` (note_id)
values (#{noteId})
</insert>
<insert id="insert2DataAlignUserLikeCountTempTable" parameterType="map">
insert into `t_data_align_user_like_count_temp_${tableNameSuffix}` (user_id)
values (#{userId})
</insert>
</mapper>