refactor(data-align):优化数据对齐任务与MQ消费逻辑
fix(data-align,note):点赞同一用户发布的两篇不同笔记,无法保存变更记录。点赞笔记的SQL查询错误修复。 - 移除了事务模板,简化数据库操作流程 - 分离笔记ID与用户ID的布隆过滤器处理逻辑- 新增针对笔记作者的点赞/收藏数变更记录 - 重构Redis键命名规范,区分笔记与用户维度 - 优化MQ消息处理流程,增强异常捕获机制 - 更新HTTP客户端测试用例与环境配置 -修复NoteServiceImpl中点赞查询的用户ID条件缺失 - 调整分片计算方式,提升数据分布均匀性
This commit is contained in:
@@ -3,14 +3,24 @@ package com.hanserwei.hannote.data.align.constant;
|
||||
public class RedisKeyConstants {
|
||||
|
||||
/**
|
||||
* 布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 前缀
|
||||
* 布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞(笔记ID) 前缀
|
||||
*/
|
||||
public static final String BLOOM_TODAY_NOTE_LIKE_LIST_KEY = "bloom:dataAlign:note:likes:";
|
||||
public static final String BLOOM_TODAY_NOTE_LIKE_NOTE_ID_LIST_KEY = "bloom:dataAlign:note:like:noteIds";
|
||||
|
||||
/**
|
||||
* 布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 前缀
|
||||
* 布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞(笔记发布者ID) 前缀
|
||||
*/
|
||||
public static final String BLOOM_TODAY_NOTE_COLLECT_LIST_KEY = "bloom:dataAlign:note:collects:";
|
||||
public static final String BLOOM_TODAY_NOTE_LIKE_USER_ID_LIST_KEY = "bloom:dataAlign:note:like:userIds";
|
||||
|
||||
/**
|
||||
* 布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏(笔记ID) 前缀
|
||||
*/
|
||||
public static final String BLOOM_TODAY_NOTE_COLLECT_NOTE_ID_LIST_KEY = "bloom:dataAlign:note:collect:noteIds";
|
||||
|
||||
/**
|
||||
* 布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏(笔记发布者ID) 前缀
|
||||
*/
|
||||
public static final String BLOOM_TODAY_NOTE_COLLECT_USER_ID_LIST_KEY = "bloom:dataAlign:note:collect:userIds";
|
||||
|
||||
/**
|
||||
* 布隆过滤器:日增量变更数据,用户笔记发布,删除 前缀
|
||||
@@ -49,13 +59,22 @@ public class RedisKeyConstants {
|
||||
|
||||
|
||||
/**
|
||||
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
||||
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞(笔记ID) KEY
|
||||
* @param date 日期
|
||||
* @return 完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞(笔记ID) KEY
|
||||
*/
|
||||
public static String buildBloomUserNoteLikeNoteIdListKey(String date) {
|
||||
return BLOOM_TODAY_NOTE_LIKE_NOTE_ID_LIST_KEY + date;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞(笔记发布者ID) KEY
|
||||
*
|
||||
* @param date 日期
|
||||
* @return 完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
||||
* @return 完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞(笔记发布者ID) KEY
|
||||
*/
|
||||
public static String buildBloomUserNoteLikeListKey(String date) {
|
||||
return BLOOM_TODAY_NOTE_LIKE_LIST_KEY + date;
|
||||
public static String buildBloomUserNoteLikeUserIdListKey(String date) {
|
||||
return BLOOM_TODAY_NOTE_LIKE_USER_ID_LIST_KEY + date;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -64,8 +83,18 @@ public class RedisKeyConstants {
|
||||
* @param date 日期
|
||||
* @return 完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||
*/
|
||||
public static String buildBloomUserNoteCollectListKey(String date) {
|
||||
return BLOOM_TODAY_NOTE_COLLECT_LIST_KEY + date;
|
||||
public static String buildBloomUserNoteCollectNoteIdListKey(String date) {
|
||||
return BLOOM_TODAY_NOTE_COLLECT_NOTE_ID_LIST_KEY + date;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||
*
|
||||
* @param date 日期
|
||||
* @return 完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||
*/
|
||||
public static String buildBloomUserNoteCollectUserIdListKey(String date) {
|
||||
return BLOOM_TODAY_NOTE_COLLECT_USER_ID_LIST_KEY + date;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -17,7 +17,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||
import org.springframework.data.redis.core.script.RedisScript;
|
||||
import org.springframework.scripting.support.ResourceScriptSource;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -35,8 +34,6 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
@Resource
|
||||
private TransactionTemplate transactionTemplate;
|
||||
@Resource
|
||||
private InsertMapper insertMapper;
|
||||
|
||||
/**
|
||||
@@ -64,7 +61,10 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene
|
||||
String date = LocalDate.now()
|
||||
.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||
|
||||
String bloomKey = RedisKeyConstants.buildBloomUserNoteCollectListKey(date);
|
||||
// ------------------------- 笔记的收藏数变更记录 -------------------------
|
||||
|
||||
// 笔记对应的 Bloom Key
|
||||
String noteBloomKey = RedisKeyConstants.buildBloomUserNoteCollectNoteIdListKey(date);
|
||||
|
||||
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||
@@ -74,38 +74,46 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene
|
||||
script.setResultType(Long.class);
|
||||
|
||||
// 执行 Lua 脚本,拿到返回结果
|
||||
Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId);
|
||||
log.info("布隆过滤器判断结果:{}", result);
|
||||
Long result = redisTemplate.execute(script, Collections.singletonList(noteBloomKey), noteId);
|
||||
|
||||
// Lua 脚本:添加到布隆过滤器
|
||||
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||
|
||||
// 若布隆过滤器判断不存在(绝对正确)
|
||||
if (Objects.equals(result, 0L)) {
|
||||
// 2. 若无,才会落库,减轻数据库压力
|
||||
// 根据分片总数,取模,分别获取对应的分片序号
|
||||
long userIdHashKey = noteCreatorId % tableShards;
|
||||
// 若无,才会落库数据库
|
||||
|
||||
// 根据分片总数,取模,获取对应的分片序号
|
||||
long noteIdHashKey = noteId % tableShards;
|
||||
log.info("根据分片总数,取模,分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey);
|
||||
|
||||
// 编程式事务,保证多语句的原子性
|
||||
transactionTemplate.execute(status -> {
|
||||
try {
|
||||
// 将日增量变更数据,分别写入两张表
|
||||
// - t_data_align_note_collect_count_temp_日期_分片序号
|
||||
// - t_data_align_user_collect_count_temp_日期_分片序号
|
||||
insertMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||
insertMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||
} catch (Exception e) {
|
||||
log.error("## TodayNoteCollectIncrementData2DBConsumer 笔记收藏数变更记录失败:{}", e.getMessage());
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
status.setRollbackOnly(); // 标记事务为回滚
|
||||
log.error("", ex);
|
||||
// 数据库落库成功后,再添加布隆过滤器中
|
||||
redisTemplate.execute(bloomAddScript, Collections.singletonList(noteBloomKey), noteId);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// 3. 数据库写入成功后,再添加布隆过滤器中
|
||||
// 4. 数据库写入成功后,再添加布隆过滤器中
|
||||
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||
redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId);
|
||||
// ------------------------- 笔记作者的收藏数变更记录 -------------------------
|
||||
// 笔记作者对应的 Bloom Key
|
||||
String userBloomKey = RedisKeyConstants.buildBloomUserNoteCollectUserIdListKey(date);
|
||||
// 执行 Lua 脚本,拿到返回结果
|
||||
result = redisTemplate.execute(script, Collections.singletonList(userBloomKey), noteCreatorId);
|
||||
// 若布隆过滤器判断不存在(绝对正确)
|
||||
if (Objects.equals(result, 0L)) {
|
||||
// 若无,才会落库数据库
|
||||
|
||||
// 根据分片总数,取模,获取对应的分片序号
|
||||
long noteCreatorIdHashKey = noteCreatorId % tableShards;
|
||||
try {
|
||||
insertMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteCreatorIdHashKey), noteCreatorId);
|
||||
} catch (Exception e) {
|
||||
log.error("## TodayNoteCollectIncrementData2DBConsumer 笔记作者的收藏数变更记录失败:{}", e.getMessage());
|
||||
}
|
||||
// 数据库落库成功后,再添加布隆过滤器中
|
||||
redisTemplate.execute(bloomAddScript, Collections.singletonList(userBloomKey), noteCreatorId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||
import org.springframework.data.redis.core.script.RedisScript;
|
||||
import org.springframework.scripting.support.ResourceScriptSource;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -35,8 +34,6 @@ public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<S
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
@Resource
|
||||
private TransactionTemplate transactionTemplate;
|
||||
@Resource
|
||||
private InsertMapper insertMapper;
|
||||
|
||||
/**
|
||||
@@ -44,26 +41,28 @@ public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<S
|
||||
*/
|
||||
@Value("${table.shards}")
|
||||
private int tableShards;
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
log.info("## TodayNoteLikeIncrementData2DBConsumer 消费到了 MQ: {}", body);
|
||||
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||
// Json字符串转DTO
|
||||
LikeUnlikeNoteMqDTO noteLikeCountMqDTO = JsonUtils.parseObject(body, LikeUnlikeNoteMqDTO.class);
|
||||
if (Objects.isNull(noteLikeCountMqDTO)) {
|
||||
return;
|
||||
}
|
||||
log.info("## TodayNoteLikeIncrementData2DBConsumer 笔记点赞数据:{}", JsonUtils.toJsonString(noteLikeCountMqDTO));
|
||||
// 获取被点赞或者取消点赞的笔记ID
|
||||
Long noteId = noteLikeCountMqDTO.getNoteId();
|
||||
// 获取点赞或取消点赞的笔记的创建者ID
|
||||
Long noteCreatorId = noteLikeCountMqDTO.getNoteCreatorId();
|
||||
|
||||
// 消息体 JSON 字符串转 DTO
|
||||
LikeUnlikeNoteMqDTO unlikeNoteMqDTO = JsonUtils.parseObject(body, LikeUnlikeNoteMqDTO.class);
|
||||
|
||||
if (Objects.isNull(unlikeNoteMqDTO)) return;
|
||||
|
||||
// 被点赞、取消点赞的笔记 ID
|
||||
Long noteId = unlikeNoteMqDTO.getNoteId();
|
||||
// 笔记的发布者 ID
|
||||
Long noteCreatorId = unlikeNoteMqDTO.getNoteCreatorId();
|
||||
|
||||
// 今日日期
|
||||
String date = LocalDate.now()
|
||||
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
|
||||
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 格式化
|
||||
|
||||
String bloomKey = RedisKeyConstants.buildBloomUserNoteLikeListKey(date);
|
||||
// ------------------------- 笔记的点赞数变更记录 -------------------------
|
||||
// 笔记对应的 Bloom Key
|
||||
String noteBloomKey = RedisKeyConstants.buildBloomUserNoteLikeNoteIdListKey(date);
|
||||
|
||||
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||
@@ -73,36 +72,52 @@ public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<S
|
||||
script.setResultType(Long.class);
|
||||
|
||||
// 执行 Lua 脚本,拿到返回结果
|
||||
Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId);
|
||||
log.info("布隆过滤器判断结果:{}", result);
|
||||
Long result = redisTemplate.execute(script, Collections.singletonList(noteBloomKey), noteId);
|
||||
|
||||
// Lua 脚本:添加到布隆过滤器
|
||||
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||
|
||||
// 若布隆过滤器判断不存在(绝对正确)
|
||||
if (Objects.equals(result, 0L)) {
|
||||
// 2. 若无,才会落库,减轻数据库压力
|
||||
// 根据分片总数,取模,分别获取对应的分片序号
|
||||
long userIdHashKey = noteCreatorId % tableShards;
|
||||
long noteIdHashKey = noteId % tableShards;
|
||||
log.info("根据分片总数,取模,分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey);
|
||||
|
||||
// 编程式事务,保证多语句的原子性
|
||||
transactionTemplate.execute(status -> {
|
||||
// 根据分片总数,取模,获取对应的分片序号
|
||||
long noteIdHashKey = noteId % tableShards;
|
||||
|
||||
try {
|
||||
// 将日增量变更数据,分别写入两张表
|
||||
// 将日增量变更数据落库
|
||||
// - t_data_align_note_like_count_temp_日期_分片序号
|
||||
// - t_data_align_user_like_count_temp_日期_分片序号
|
||||
insertMapper.insert2DataAlignNoteLikeCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||
insertMapper.insert2DataAlignUserLikeCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
status.setRollbackOnly();
|
||||
log.error("## TodayNoteLikeIncrementData2DBConsumer 落库失败,回滚事务", ex);
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
// 3. 数据库写入成功后,再添加布隆过滤器中
|
||||
|
||||
// 4. 数据库写入成功后,再添加布隆过滤器中
|
||||
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||
redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId);
|
||||
redisTemplate.execute(bloomAddScript, Collections.singletonList(noteBloomKey), noteId);
|
||||
}
|
||||
|
||||
// ------------------------- 笔记发布者获得的点赞数变更记录 -------------------------
|
||||
// 笔记发布者对应的 Bloom Key
|
||||
String userBloomKey = RedisKeyConstants.buildBloomUserNoteLikeUserIdListKey(date);
|
||||
// 执行 Lua 脚本,拿到返回结果
|
||||
result = redisTemplate.execute(script, Collections.singletonList(userBloomKey), noteCreatorId);
|
||||
// 若布隆过滤器判断不存在(绝对正确)
|
||||
if (Objects.equals(result, 0L)) {
|
||||
// 2. 若无,才会落库,减轻数据库压力
|
||||
|
||||
// 根据分片总数,取模,获取对应的分片序号
|
||||
long userIdHashKey = noteCreatorId % tableShards;
|
||||
|
||||
try {
|
||||
// 将日增量变更数据落库
|
||||
// - t_data_align_user_like_count_temp_日期_分片序号
|
||||
insertMapper.insert2DataAlignUserLikeCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
|
||||
// 4. 数据库写入成功后,再添加布隆过滤器中
|
||||
redisTemplate.execute(bloomAddScript, Collections.singletonList(userBloomKey), noteCreatorId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ public class FollowingCountShardingXxlJob {
|
||||
* 分片广播任务
|
||||
*/
|
||||
@XxlJob("followingCountShardingJobHandler")
|
||||
public void followingCountShardingJobHandler() throws Exception {
|
||||
public void followingCountShardingJobHandler() {
|
||||
// 获取分片参数
|
||||
// 分片序号
|
||||
int shardIndex = XxlJobHelper.getShardIndex();
|
||||
|
||||
@@ -651,6 +651,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
||||
//从数据库中校验笔记是否被点赞,并异步初始化布隆过滤器,设置过期时间
|
||||
long count = noteLikeDOService.count(new LambdaQueryWrapper<>(NoteLikeDO.class)
|
||||
.eq(NoteLikeDO::getNoteId, noteId)
|
||||
.eq(NoteLikeDO::getUserId, userId)
|
||||
.eq(NoteLikeDO::getStatus, LikeStatusEnum.LIKE.getCode()));
|
||||
|
||||
// 保底1天+随机秒数
|
||||
|
||||
@@ -77,8 +77,8 @@ Authorization: Bearer {{token}}
|
||||
"imgUris": [
|
||||
"https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg"
|
||||
],
|
||||
"title": "测试数据对齐图文笔记5",
|
||||
"content": "测试数据对齐测试数据对齐测试数据对齐测试5",
|
||||
"title": "bug修复2",
|
||||
"content": "bugbugbug",
|
||||
"topicId": 1
|
||||
}
|
||||
|
||||
@@ -197,16 +197,16 @@ Authorization: Bearer {{token}}
|
||||
### 笔记点赞入口
|
||||
POST http://localhost:8000/note/note/like
|
||||
Content-Type: application/json
|
||||
Authorization: Bearer {{token}}
|
||||
Authorization: Bearer {{thirdToken}}
|
||||
|
||||
{
|
||||
"id": 1977249693272375330
|
||||
"id": 1981698494959714362
|
||||
}
|
||||
|
||||
### 笔记取消点赞入口
|
||||
POST http://localhost:8000/note/note/unlike
|
||||
Content-Type: application/json
|
||||
Authorization: Bearer {{token}}
|
||||
Authorization: Bearer {{otherToken}}
|
||||
|
||||
{
|
||||
"id": 1977249693272375330
|
||||
@@ -215,16 +215,16 @@ Authorization: Bearer {{token}}
|
||||
### 笔记收藏入口
|
||||
POST http://localhost:8000/note/note/collect
|
||||
Content-Type: application/json
|
||||
Authorization: Bearer {{token}}
|
||||
Authorization: Bearer {{thirdToken}}
|
||||
|
||||
{
|
||||
"id": 1977249693272375330
|
||||
"id": 1981698494959714362
|
||||
}
|
||||
|
||||
### 笔记取消收藏入口
|
||||
POST http://localhost:8000/note/note/uncollect
|
||||
Content-Type: application/json
|
||||
Authorization: Bearer {{token}}
|
||||
Authorization: Bearer {{otherToken}}
|
||||
|
||||
{
|
||||
"id": 1977249693272375330
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
"dev": {
|
||||
"token": "4bXpiBbjXEDFE4ZpqjCOHu1rP81qepl2ROOygrxRGb61K536ckLuyAwfyQHSMcyRdUzf8CxntLEMfbU2ynbYx9nJKlx4vpWZrHqv2mI4iMhnShQ4mPBi7OPPgZi22O2f",
|
||||
"otherToken": "mqFNHrWkPcipIAvw7Gn4cigOWYP54sn8HYlQX3CXTxHf90DhjFiROhWVgPqLBi35xKXOOfHlXeEdaQrkXf1JXd8hbXBOdZqnrycW96BJwTbUS40EqIZifVgPun3ai0Ek",
|
||||
"thirdToken": "iA8XE1vFDXYwgNPnRyIrNaj5EKcQypUTtn91wCMGtF8FfFdFzvRUad4Q7shLkOgUQ5QMB5n25JP91vpYIvr7udoL1HUxdjlSlXCEXivTQlgaABkz5owdhzhyHqGg0XP8",
|
||||
"noteId": "1977249693272375330",
|
||||
"userId": "100",
|
||||
"otherUserId": "2100"
|
||||
|
||||
Reference in New Issue
Block a user