From ac65664dfe30ebd1d68c961651ed23ff974d3452 Mon Sep 17 00:00:00 2001 From: Hanserwei <2628273921@qq.com> Date: Fri, 24 Oct 2025 19:10:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(data-align):=20=E5=AE=9E=E7=8E=B0=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E5=85=B3=E6=B3=A8=E6=95=B0=E5=AF=B9=E9=BD=90=E5=88=86?= =?UTF-8?q?=E7=89=87=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 DeleteRecordMapper 接口及 XML 配置,支持批量删除临时表记录 - 新增 SelectRecordMapper 接口及 XML 配置,支持分批查询和统计关注数 - 新增 UpdateRecordMapper 接口及 XML 配置,用于更新用户关注总数 - 新增 FollowingCountShardingXxlJob 任务类,实现分片广播处理关注数对齐逻辑 -重命名 InsertRecordMapper为 InsertMapper 并同步更新相关引用 - 在 RedisKeyConstants 中新增构建用户计数 Key 的方法及相关常量 - 修改多个消费者类中的 Mapper 引用名称以匹配重命名后的接口 - 更新数据源映射文件,调整 Mapper XML 文件路径配置 --- .idea/data_source_mapping.xml | 3 +- .../align/constant/RedisKeyConstants.java | 20 ++++ ...ayNoteCollectIncrementData2DBConsumer.java | 8 +- ...TodayNoteLikeIncrementData2DBConsumer.java | 8 +- ...ayNotePublishIncrementData2DBConsumer.java | 6 +- ...dayUserFollowIncrementData2DBConsumer.java | 8 +- .../domain/mapper/DeleteRecordMapper.java | 16 +++ ...ertRecordMapper.java => InsertMapper.java} | 2 +- .../domain/mapper/SelectRecordMapper.java | 30 ++++++ .../domain/mapper/UpdateRecordMapper.java | 15 +++ .../job/FollowingCountShardingXxlJob.java | 101 ++++++++++++++++++ .../mapperxml/DeleteRecordMapper.xml | 12 +++ ...nsertRecordMapper.xml => InsertMapper.xml} | 2 +- .../mapperxml/SelectRecordMapper.xml | 16 +++ .../mapperxml/UpdateRecordMapper.xml | 9 ++ 15 files changed, 238 insertions(+), 18 deletions(-) create mode 100644 han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java rename han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/{InsertRecordMapper.java => InsertMapper.java} (97%) create mode 100644 han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java create mode 100644 han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java create mode 100644 han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FollowingCountShardingXxlJob.java create mode 100644 han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml rename han-note-data-align/src/main/resources/mapperxml/{InsertRecordMapper.xml => InsertMapper.xml} (98%) create mode 100644 han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml create mode 100644 han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml diff --git a/.idea/data_source_mapping.xml b/.idea/data_source_mapping.xml index 34a3631..ba1818a 100644 --- a/.idea/data_source_mapping.xml +++ b/.idea/data_source_mapping.xml @@ -3,7 +3,8 @@ - + + diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java index 044d438..122161b 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java @@ -27,6 +27,26 @@ public class RedisKeyConstants { */ public static final String BLOOM_TODAY_USER_FANS_LIST_KEY = "bloom:dataAlign:user:fans:"; + /** + * Hash Field: 关注总数 + */ + public static final String FIELD_FOLLOWING_TOTAL = "followingTotal"; + + /** + * 用户维度计数 Key 前缀 + */ + private static final String COUNT_USER_KEY_PREFIX = "count:user:"; + + /** + * 构建用户维度计数 Key + * + * @param userId 用户 ID + * @return 用户维度计数 Key + */ + public static String buildCountUserKey(Long userId) { + return COUNT_USER_KEY_PREFIX + userId; + } + /** * 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java index b133633..c853aed 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java @@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.data.align.constant.MQConstants; import com.hanserwei.hannote.data.align.constant.RedisKeyConstants; import com.hanserwei.hannote.data.align.constant.TableConstants; -import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper; +import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper; import com.hanserwei.hannote.data.align.model.vo.CollectUnCollectNoteMqDTO; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -37,7 +37,7 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene @Resource private TransactionTemplate transactionTemplate; @Resource - private InsertRecordMapper insertRecordMapper; + private InsertMapper insertMapper; /** * 表总分片数 @@ -91,8 +91,8 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene // 将日增量变更数据,分别写入两张表 // - t_data_align_note_collect_count_temp_日期_分片序号 // - t_data_align_user_collect_count_temp_日期_分片序号 - insertRecordMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId); - insertRecordMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId); + insertMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId); + insertMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId); return true; } catch (Exception ex) { diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java index 4daa005..76c4e75 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java @@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.data.align.constant.MQConstants; import com.hanserwei.hannote.data.align.constant.RedisKeyConstants; import com.hanserwei.hannote.data.align.constant.TableConstants; -import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper; +import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper; import com.hanserwei.hannote.data.align.model.vo.LikeUnlikeNoteMqDTO; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -37,7 +37,7 @@ public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener redisTemplate; @Resource - private InsertRecordMapper insertRecordMapper; + private InsertMapper insertMapper; /** * 表总分片数 @@ -81,7 +81,7 @@ public class TodayNotePublishIncrementData2DBConsumer implements RocketMQListene // 将日增量变更数据,写入日增量表中 // - t_data_align_note_publish_count_temp_日期_分片序号 - insertRecordMapper.insert2DataAlignUserNotePublishCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId); + insertMapper.insert2DataAlignUserNotePublishCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId); // 3. 数据库写入成功后,再添加布隆过滤器中 RedisScript bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class); diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayUserFollowIncrementData2DBConsumer.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayUserFollowIncrementData2DBConsumer.java index 8b89939..b302ae6 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayUserFollowIncrementData2DBConsumer.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayUserFollowIncrementData2DBConsumer.java @@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.data.align.constant.MQConstants; import com.hanserwei.hannote.data.align.constant.RedisKeyConstants; import com.hanserwei.hannote.data.align.constant.TableConstants; -import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper; +import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper; import com.hanserwei.hannote.data.align.model.vo.FollowUnfollowMqDTO; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -36,7 +36,7 @@ public class TodayUserFollowIncrementData2DBConsumer implements RocketMQListener private RedisTemplate redisTemplate; @Resource - private InsertRecordMapper insertRecordMapper; + private InsertMapper insertMapper; /** * 表总分片数 @@ -90,7 +90,7 @@ public class TodayUserFollowIncrementData2DBConsumer implements RocketMQListener try { // 将日增量变更数据,写入表 t_data_align_following_count_temp_日期_分片序号 - insertRecordMapper.insert2DataAlignUserFollowingCountTempTable( + insertMapper.insert2DataAlignUserFollowingCountTempTable( TableConstants.buildTableNameSuffix(date, userIdHashKey), userId); } catch (Exception e) { log.error("", e); @@ -115,7 +115,7 @@ public class TodayUserFollowIncrementData2DBConsumer implements RocketMQListener try { // 将日增量变更数据,写入表 t_data_align_fans_count_temp_日期_分片序号 - insertRecordMapper.insert2DataAlignUserFansCountTempTable( + insertMapper.insert2DataAlignUserFansCountTempTable( TableConstants.buildTableNameSuffix(date, targetUserIdHashKey), targetUserId); } catch (Exception e) { log.error("", e); diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java new file mode 100644 index 0000000..de3d5ed --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java @@ -0,0 +1,16 @@ +package com.hanserwei.hannote.data.align.domain.mapper; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +public interface DeleteRecordMapper { + + /** + * 日增量表:关注数计数变更 - 批量删除 + * + * @param userIds 用户 ID + */ + void batchDeleteDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, + @Param("userIds") List userIds); +} \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertMapper.java similarity index 97% rename from han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java rename to han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertMapper.java index 47f0bae..34ce10b 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertMapper.java @@ -5,7 +5,7 @@ import org.apache.ibatis.annotations.Param; /** * 添加记录 */ -public interface InsertRecordMapper { +public interface InsertMapper { /** * 笔记点赞数:计数变更 diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java new file mode 100644 index 0000000..dbe3af4 --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java @@ -0,0 +1,30 @@ +package com.hanserwei.hannote.data.align.domain.mapper; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * 查询 + */ +public interface SelectRecordMapper { + + + /** + * 日增量表:关注数计数变更 - 批量查询 + * + * @param tableNameSuffix 表名后缀 + * @param batchSize 批量大小 + * @return 批量查询结果 + */ + List selectBatchFromDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, + @Param("batchSize") int batchSize); + + /** + * 查询 t_following 关注表,获取关注总数 + * + * @param userId 用户 ID + * @return 关注总数 + */ + int selectCountFromFollowingTableByUserId(long userId); +} \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java new file mode 100644 index 0000000..461f9ce --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java @@ -0,0 +1,15 @@ +package com.hanserwei.hannote.data.align.domain.mapper; + +import org.apache.ibatis.annotations.Param; + +public interface UpdateRecordMapper { + + /** + * 更新 t_user_count 计数表总关注数 + * + * @param userId 用户 ID + * @return 更新行数 + */ + int updateUserFollowingTotalByUserId(@Param("userId") long userId, + @Param("followingTotal") int followingTotal); +} \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FollowingCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FollowingCountShardingXxlJob.java new file mode 100644 index 0000000..e15bb96 --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FollowingCountShardingXxlJob.java @@ -0,0 +1,101 @@ +package com.hanserwei.hannote.data.align.job; + +import cn.hutool.core.collection.CollUtil; +import com.hanserwei.hannote.data.align.constant.RedisKeyConstants; +import com.hanserwei.hannote.data.align.constant.TableConstants; +import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper; +import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper; +import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.List; + +@Component +@Slf4j +public class FollowingCountShardingXxlJob { + + @Resource + private SelectRecordMapper selectRecordMapper; + @Resource + private UpdateRecordMapper updateRecordMapper; + @Resource + private DeleteRecordMapper deleteRecordMapper; + @Resource + private RedisTemplate redisTemplate; + + /** + * 分片广播任务 + */ + @XxlJob("followingCountShardingJobHandler") + public void followingCountShardingJobHandler() throws Exception { + // 获取分片参数 + // 分片序号 + int shardIndex = XxlJobHelper.getShardIndex(); + // 分片总数 + int shardTotal = XxlJobHelper.getShardTotal(); + + XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户关注数进行对齐"); + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + // 表后缀 + String date = LocalDate.now().minusDays(1) // 昨日的日期 + .format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串 + // 表名后缀 + String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex); + + // 一批次 1000 条 + int batchSize = 1000; + // 共对齐了多少条记录,默认为 0 + int processedTotal = 0; + + while (true) { + // 1. 分批次查询,如一次查询1000 条,直到查询完毕 + List userIds = selectRecordMapper.selectBatchFromDataAlignFollowingCountTempTable(tableNameSuffix, batchSize); + + // 若记录为空,则结束循环 + if (CollUtil.isEmpty(userIds)) { + break; + } + + // 循环这一批发生变更的用户 ID + userIds.forEach(userId -> { + // 2: 对 t_following 关注表执行 count(*) 操作,获取关注总数 + int followingTotal = selectRecordMapper.selectCountFromFollowingTableByUserId(userId); + + // 3: 更新 t_user_count 表 + int count = updateRecordMapper.updateUserFollowingTotalByUserId(userId, followingTotal); + // 更新对应 Redis 缓存 + if (count > 0) { + String redisKey = RedisKeyConstants.buildCountUserKey(userId); + // 判断 Hash 是否存在 + boolean hashKey = redisTemplate.hasKey(redisKey); + // 若存在 + if (hashKey) { + // 更新 Hash 中的 Field 关注总数 + redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_FOLLOWING_TOTAL, followingTotal); + } + } + }); + + + // 4. 批量物理删除这一批次记录 + + deleteRecordMapper.batchDeleteDataAlignFollowingCountTempTable(tableNameSuffix, userIds); + + // 当前已处理的记录数 + processedTotal += userIds.size(); + } + + XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户关注数进行对齐,共对齐记录数:{}", processedTotal); + } + +} \ No newline at end of file diff --git a/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml new file mode 100644 index 0000000..121054b --- /dev/null +++ b/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml @@ -0,0 +1,12 @@ + + + + + delete + from `t_data_align_following_count_temp_${tableNameSuffix}` + where user_id in + + #{userId} + + + \ No newline at end of file diff --git a/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/InsertMapper.xml similarity index 98% rename from han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml rename to han-note-data-align/src/main/resources/mapperxml/InsertMapper.xml index 0abefac..2fa5b88 100644 --- a/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml +++ b/han-note-data-align/src/main/resources/mapperxml/InsertMapper.xml @@ -1,6 +1,6 @@ - + insert into `t_data_align_note_like_count_temp_${tableNameSuffix}` (note_id) values (#{noteId}) diff --git a/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml new file mode 100644 index 0000000..fd8741d --- /dev/null +++ b/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml @@ -0,0 +1,16 @@ + + + + + + + \ No newline at end of file diff --git a/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml new file mode 100644 index 0000000..817d082 --- /dev/null +++ b/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml @@ -0,0 +1,9 @@ + + + + + update t_user_count + set following_total = #{followingTotal} + where user_id = #{userId} + + \ No newline at end of file