diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 863e3f2..881a5fd 100755
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -23,6 +23,7 @@
+
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java
index 30bcde7..6b60939 100644
--- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java
@@ -42,10 +42,42 @@ public class RedisKeyConstants {
*/
public static final String FIELD_FOLLOWING_TOTAL = "followingTotal";
+ /**
+ * Hash Field: 粉丝总数
+ */
+ public static final String FIELD_FANS_TOTAL = "fansTotal";
+
+ /**
+ * Hash Field: 笔记发布总数
+ */
+ public static final String FIELD_NOTE_TOTAL = "noteTotal";
+
/**
* 用户维度计数 Key 前缀
*/
private static final String COUNT_USER_KEY_PREFIX = "count:user:";
+ /**
+ * Hash Field: 笔记点赞总数
+ */
+ public static final String FIELD_LIKE_TOTAL = "likeTotal";
+ /**
+ * Hash Field: 笔记收藏总数
+ */
+ public static final String FIELD_COLLECT_TOTAL = "collectTotal";
+ /**
+ * 笔记维度计数 Key 前缀
+ */
+ private static final String COUNT_NOTE_KEY_PREFIX = "count:note:";
+
+ /**
+ * 构建笔记维度计数 Key
+ *
+ * @param noteId 笔记 ID
+ * @return 笔记维度计数 Key
+ */
+ public static String buildCountNoteKey(Long noteId) {
+ return COUNT_NOTE_KEY_PREFIX + noteId;
+ }
/**
* 构建用户维度计数 Key
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java
index de3d5ed..aaa95ed 100644
--- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/DeleteRecordMapper.java
@@ -8,9 +8,40 @@ public interface DeleteRecordMapper {
/**
* 日增量表:关注数计数变更 - 批量删除
- *
- * @param userIds 用户 ID
*/
void batchDeleteDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("userIds") List userIds);
+
+ /**
+ * 日增量表:粉丝数计数变更 - 批量删除
+ */
+ void batchDeleteDataAlignFansCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("userIds") List userIds);
+
+ /**
+ * 日增量表:笔记发布数计数变更 - 批量删除
+ */
+ void batchDeleteDataAlignNotePublishCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("userIds") List userIds);
+
+ /**
+ * 日增量表:笔记点赞计数变更 - 批量删除
+ */
+ void batchDeleteDataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("noteIds") List noteIds);
+
+ void batchDeleteDataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("noteIds") List noteIds);
+
+ /**
+ * 日增量表:笔记收藏计数变更 - 批量删除
+ */
+ void batchDeleteDataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("userIds") List userIds);
+
+ /**
+ * 日增量表:用户点赞计数变更 - 批量删除
+ */
+ void batchDeleteDataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("userIds") List userIds);
}
\ No newline at end of file
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java
index dbe3af4..b597e42 100644
--- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/SelectRecordMapper.java
@@ -20,6 +20,24 @@ public interface SelectRecordMapper {
List selectBatchFromDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
+ /**
+ * 查询 t_fans 粉丝表,获取粉丝总数
+ *
+ * @param userId 用户 ID
+ * @return 粉丝总数
+ */
+ int selectCountFromFansTableByUserId(long userId);
+
+ /**
+ * 日增量表:粉丝数计数变更 - 批量查询
+ *
+ * @param tableNameSuffix 表名后缀
+ * @param batchSize 批量大小
+ * @return 批量查询结果
+ */
+ List selectBatchFromDataAlignFansCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("batchSize") int batchSize);
+
/**
* 查询 t_following 关注表,获取关注总数
*
@@ -27,4 +45,96 @@ public interface SelectRecordMapper {
* @return 关注总数
*/
int selectCountFromFollowingTableByUserId(long userId);
+
+ /**
+ * 日增量表:笔记发布数变更 - 批量查询
+ *
+ * @param tableNameSuffix 表名后缀
+ * @param batchSize 批量大小
+ * @return 批量查询结果
+ */
+ List selectBatchFromDataAlignNotePublishCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("batchSize") int batchSize);
+
+ /**
+ * 批量查询 t_note 笔记表,获取发布总数
+ *
+ * @param userId 用户 ID
+ * @return 发布总数
+ */
+ int selectCountFromNoteTableByUserId(long userId);
+
+
+ /**
+ * 日增量表:笔记点赞数变更 - 批量查询
+ *
+ * @param tableNameSuffix 表名后缀
+ * @param batchSize 批量大小
+ * @return 批量查询结果
+ */
+ List selectBatchFromDataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("batchSize") int batchSize);
+
+ /**
+ * 查询 t_note_like 笔记点赞表,获取点赞总数
+ *
+ * @param noteId 笔记 ID
+ * @return 点赞总数
+ */
+ int selectCountFromNoteLikeTableByNoteId(long noteId);
+
+ /**
+ * 日增量表:笔记收藏数变更 - 批量查询
+ *
+ * @param tableNameSuffix 表名后缀
+ * @param batchSize 批量大小
+ * @return 批量查询结果
+ */
+ List selectBatchFromDataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("batchSize") int batchSize);
+
+ /**
+ * 批量查询 t_note_collection 笔记收藏表,获取收藏总数
+ *
+ * @param noteId 笔记 ID
+ * @return 收藏总数
+ */
+ int selectCountFromNoteCollectTableByNoteId(long noteId);
+
+
+ /**
+ * 日增量表:用户收藏数变更 - 批量查询
+ *
+ * @param tableNameSuffix 表名后缀
+ * @param batchSize 批量大小
+ * @return 批量查询结果
+ */
+ List selectBatchFromDataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("batchSize") int batchSize);
+
+ /**
+ * 批量查询 t_note_collection 笔记收藏表,获取收藏总数
+ *
+ * @param userId 用户 ID
+ * @return 收藏总数
+ */
+ int selectUserCollectCountFromNoteCollectionTableByUserId(Long userId);
+
+ /**
+ * 日增量表:用户点赞数变更 - 批量查询
+ *
+ * @param tableNameSuffix 表名后缀
+ * @param batchSize 批量大小
+ * @return 批量查询结果
+ */
+ List selectBatchFromDataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
+ @Param("batchSize") int batchSize);
+
+ /**
+ * 批量查询 t_note_like 笔记点赞表,获取点赞总数
+ *
+ * @param userId 用户 ID
+ * @return 点赞总数
+ */
+ int selectUserLikeCountFromNoteLikeTableByUserId(Long userId);
}
\ No newline at end of file
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java
index 461f9ce..0baa60a 100644
--- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/UpdateRecordMapper.java
@@ -6,10 +6,44 @@ public interface UpdateRecordMapper {
/**
* 更新 t_user_count 计数表总关注数
- *
- * @param userId 用户 ID
- * @return 更新行数
*/
int updateUserFollowingTotalByUserId(@Param("userId") long userId,
@Param("followingTotal") int followingTotal);
+
+ /**
+ * 更新 t_user_count 计数表总粉丝数
+ */
+ int updateUserFansTotalByUserId(@Param("userId") long userId,
+ @Param("fansTotal") int fansTotal);
+
+ /**
+ * 更新 t_user_count 计数表总笔记数
+ */
+ int updateUserNotePublishTotalByUserId(@Param("userId") Long userId,
+ @Param("notePublishTotal") int notePublishTotal);
+
+ /**
+ * 更新 t_note_count 计数表笔记点赞数
+ */
+ int updateNoteLikeTotalByNoteId(@Param("noteId") long noteId,
+ @Param("noteLikeTotal") int noteLikeTotal);
+
+ /**
+ * 更新 t_note_count 计数表笔记收藏数
+ */
+ int updateNoteCollectTotalByNoteId(@Param("noteId") long noteId,
+ @Param("noteCollectTotal") int noteCollectTotal);
+
+
+ /**
+ * 更新 t_user_count 计数表总收藏数
+ */
+ int updateUserCollectTotalByUserId(@Param("userId") Long userId,
+ @Param("userCollectTotal") int userCollectTotal);
+
+ /**
+ * 更新 t_user_count 计数表总点赞数
+ */
+ int updateUserLikeTotalByUserId(@Param("userId") Long userId,
+ @Param("userLikeTotal") int userLikeTotal);
}
\ No newline at end of file
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FansCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FansCountShardingXxlJob.java
new file mode 100644
index 0000000..abd3553
--- /dev/null
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/FansCountShardingXxlJob.java
@@ -0,0 +1,95 @@
+package com.hanserwei.hannote.data.align.job;
+
+import cn.hutool.core.collection.CollUtil;
+import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
+import com.hanserwei.hannote.data.align.constant.TableConstants;
+import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+@Component
+@Slf4j
+public class FansCountShardingXxlJob {
+
+ @Resource
+ private SelectRecordMapper selectRecordMapper;
+ @Resource
+ private UpdateRecordMapper updateRecordMapper;
+ @Resource
+ private DeleteRecordMapper deleteRecordMapper;
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ /**
+ * 分片广播任务
+ */
+ @XxlJob("fansCountShardingJobHandler")
+ public void fansCountShardingJobHandler() {
+ // 获取分片参数
+ // 分片序号
+ int shardIndex = XxlJobHelper.getShardIndex();
+ // 分片总数
+ int shardTotal = XxlJobHelper.getShardTotal();
+
+ XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户粉丝数进行对齐");
+ XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ // 表后缀
+ String date = LocalDate.now().minusDays(1) // 昨日的日期
+ .format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
+ // 表名后缀
+ String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
+
+ // 一批次 1000 条
+ int batchSize = 1000;
+ // 共对齐了多少条记录,默认为 0
+ int processedTotal = 0;
+
+ while (true) {
+ // 分批次查询t_data_align_fans_count_temp_日期_分片序号,如一批次查询 1000 条,直到全部查询完成
+ List userIds = selectRecordMapper.selectBatchFromDataAlignFansCountTempTable(tableNameSuffix, batchSize);
+
+ // 若记录为空,则结束循环
+ if (CollUtil.isEmpty(userIds)) {
+ break;
+ }
+
+ // 循环这一批发生变化的用户ID
+ userIds.forEach(userId -> {
+ // 1. 对t_fans进行count(*)操作,获取该用户ID的粉丝总数
+ int fansTotal = selectRecordMapper.selectCountFromFansTableByUserId(userId);
+ // 2. 更新t_user_count的粉丝总数
+ int count = updateRecordMapper.updateUserFansTotalByUserId(userId, fansTotal);
+ // 更新Redis缓存
+ if (count > 0) {
+ String redisKey = RedisKeyConstants.buildCountUserKey(userId);
+ boolean hashKey = redisTemplate.hasKey(redisKey);
+ if (hashKey) {
+ redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, fansTotal);
+ }
+ }
+ });
+
+ // 删除t_data_align_fans_count_temp_日期_分片序号中的记录
+ deleteRecordMapper.batchDeleteDataAlignFansCountTempTable(tableNameSuffix, userIds);
+
+ // 当前已处理的记录数
+ processedTotal += userIds.size();
+ }
+
+ XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户粉丝数进行对齐,共处理 {} 条记录", processedTotal);
+ }
+
+}
\ No newline at end of file
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NoteCollectCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NoteCollectCountShardingXxlJob.java
new file mode 100644
index 0000000..c42496e
--- /dev/null
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NoteCollectCountShardingXxlJob.java
@@ -0,0 +1,94 @@
+package com.hanserwei.hannote.data.align.job;
+
+import cn.hutool.core.collection.CollUtil;
+import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
+import com.hanserwei.hannote.data.align.constant.TableConstants;
+import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+@Component
+@Slf4j
+public class NoteCollectCountShardingXxlJob {
+
+ @Resource
+ private SelectRecordMapper selectRecordMapper;
+
+ @Resource
+ private UpdateRecordMapper updateRecordMapper;
+
+ @Resource
+ private DeleteRecordMapper deleteRecordMapper;
+
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @XxlJob("noteCollectCountShardingJobHandler")
+ public void noteCollectCountShardingJobHandler() throws Exception {
+ // 获取分片参数
+ // 分片序号
+ int shardIndex = XxlJobHelper.getShardIndex();
+ // 分片总数
+ int shardTotal = XxlJobHelper.getShardTotal();
+
+ XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的笔记收藏数进行对齐");
+ XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ // 表后缀
+ String date = LocalDate.now().minusDays(1)
+ .format(DateTimeFormatter.ofPattern("yyyyMMdd"));
+ // 表名后缀
+ String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
+
+ // 一批次1000条
+ int batchSize = 1000;
+ // 共对齐多少条数据,默认为0
+ int processedTotal = 0;
+
+ while (true) {
+ // 1. 分批次查询 t_data_align_note_collect_count_temp_日期_分片序号,如一批次查询 1000 条,直到全部查询完成
+ List noteIds = selectRecordMapper.selectBatchFromDataAlignNoteCollectCountTempTable(tableNameSuffix, batchSize);
+
+ // 若查询结果为空,则跳出循环
+ if (CollUtil.isEmpty(noteIds)) {
+ break;
+ }
+
+ noteIds.forEach(noteId -> {
+ int likeTotal = selectRecordMapper.selectCountFromNoteCollectTableByNoteId(noteId);
+
+ // 3: 更新 t_note_count 表, 更新对应 Redis 缓存
+ int count = updateRecordMapper.updateNoteCollectTotalByNoteId(noteId, likeTotal);
+ if (count > 0) {
+ String redisKey = RedisKeyConstants.buildCountNoteKey(noteId);
+ // 判断 Hash 是否存在
+ boolean hashKey = redisTemplate.hasKey(redisKey);
+ // 若存在
+ if (hashKey) {
+ // 更新对应 Redis 缓存
+ redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_COLLECT_TOTAL, likeTotal);
+ }
+ }
+ });
+
+ // 4. 批量物理删除这一批次记录
+ deleteRecordMapper.batchDeleteDataAlignNoteCollectCountTempTable(tableNameSuffix, noteIds);
+
+ processedTotal += noteIds.size();
+ }
+
+ XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的笔记收藏数进行对齐,共对齐记录数:{}", processedTotal);
+ }
+}
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NoteLikeCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NoteLikeCountShardingXxlJob.java
new file mode 100644
index 0000000..1b96d9a
--- /dev/null
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NoteLikeCountShardingXxlJob.java
@@ -0,0 +1,95 @@
+package com.hanserwei.hannote.data.align.job;
+
+import cn.hutool.core.collection.CollUtil;
+import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
+import com.hanserwei.hannote.data.align.constant.TableConstants;
+import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+@Component
+@Slf4j
+public class NoteLikeCountShardingXxlJob {
+
+ @Resource
+ private SelectRecordMapper selectRecordMapper;
+
+ @Resource
+ private UpdateRecordMapper updateRecordMapper;
+
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @Resource
+ private DeleteRecordMapper deleteRecordMapper;
+
+ @XxlJob("noteLikeCountShardingJobHandler")
+ public void noteLikeCountShardingJobHandler() throws Exception {
+ // 获取分片参数
+ // 分片序号
+ int shardIndex = XxlJobHelper.getShardIndex();
+ // 分片总数
+ int shardTotal = XxlJobHelper.getShardTotal();
+
+ XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的笔记点赞数进行对齐");
+ XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ // 表后缀
+ String date = LocalDate.now().minusDays(1)
+ .format(DateTimeFormatter.ofPattern("yyyyMMdd"));
+ // 表名后缀
+ String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
+
+ // 一批次1000条
+ int batchSize = 1000;
+ // 共对齐多少条数据,默认为0
+ int processedTotal = 0;
+
+ while (true) {
+ // 1. 分批次查询 t_data_align_note_like_count_temp_日期_分片序号,如一批次查询 1000 条,直到全部查询完成
+ List noteIds = selectRecordMapper.selectBatchFromDataAlignNoteLikeCountTempTable(tableNameSuffix, batchSize);
+
+ // 若记录为空,则结束循环
+ if (CollUtil.isEmpty(noteIds)) {
+ break;
+ }
+
+ // 循环这一批发生变更的笔记 ID
+ noteIds.forEach(noteId -> {
+ // 对 t_note_like 关注表执行 count(*) 操作,获取关注总数
+ int likeTotal = selectRecordMapper.selectCountFromNoteLikeTableByNoteId(noteId);
+
+ // 3: 更新 t_note_count 表, 更新对应 Redis 缓存
+ int count = updateRecordMapper.updateNoteLikeTotalByNoteId(noteId, likeTotal);
+ // 更新对应 Redis 缓存
+ if (count > 0) {
+ String redisKey = RedisKeyConstants.buildCountNoteKey(noteId);
+ // 判断 Hash 是否存在
+ boolean hashKey = redisTemplate.hasKey(redisKey);
+ // 若存在
+ if (hashKey) {
+ redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, likeTotal);
+ }
+ }
+ });
+ // 4. 批量物理删除这一批次记录
+ deleteRecordMapper.batchDeleteDataAlignNoteLikeCountTempTable(tableNameSuffix, noteIds);
+
+ // 当前处理的记录数
+ processedTotal += noteIds.size();
+ }
+ XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的笔记点赞数进行对齐,共对齐记录数:{}", processedTotal);
+ }
+}
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NotePublishCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NotePublishCountShardingXxlJob.java
new file mode 100644
index 0000000..0cd1699
--- /dev/null
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/NotePublishCountShardingXxlJob.java
@@ -0,0 +1,84 @@
+package com.hanserwei.hannote.data.align.job;
+
+import cn.hutool.core.collection.CollUtil;
+import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
+import com.hanserwei.hannote.data.align.constant.TableConstants;
+import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+@Component
+@Slf4j
+public class NotePublishCountShardingXxlJob {
+
+ @Resource
+ private SelectRecordMapper selectRecordMapper;
+ @Resource
+ private UpdateRecordMapper updateRecordMapper;
+ @Resource
+ private DeleteRecordMapper deleteRecordMapper;
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @XxlJob("notePublishCountShardingJobHandler")
+ public void notePublishCountShardingJobHandler() throws Exception {
+ // 获取分片参数
+ // 分片序号
+ int shardIndex = XxlJobHelper.getShardIndex();
+ // 分片总数
+ int shardTotal = XxlJobHelper.getShardTotal();
+
+ XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的笔记发布数进行对齐");
+ XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ // 表后缀
+ String date = LocalDate.now().minusDays(1) // 昨日的日期
+ .format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
+ // 表名后缀
+ String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
+
+ // 一批次 1000 条
+ int batchSize = 1000;
+ // 共对齐了多少条记录,默认为 0
+ int processedTotal = 0;
+
+ while (true) {
+ // 分批次查询t_data_align_note_publish_count_temp_日期_分片序号,如一批次查询 1000 条,直到全部查询完成
+ List userIds = selectRecordMapper.selectBatchFromDataAlignNotePublishCountTempTable(tableNameSuffix, batchSize);
+ // 若记录为空,则结束循环
+ if (CollUtil.isEmpty(userIds)) {
+ break;
+ }
+ userIds.forEach(userId -> {
+ int notePublishTotal = selectRecordMapper.selectCountFromNoteTableByUserId(userId);
+ //3: 更新 t_user_count 表
+ int count = updateRecordMapper.updateUserNotePublishTotalByUserId(userId, notePublishTotal);
+ if (count > 0) {
+ String redisKey = RedisKeyConstants.buildCountUserKey(userId);
+ boolean hashKey = redisTemplate.hasKey(redisKey);
+ if (hashKey) {
+ redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_NOTE_TOTAL, notePublishTotal);
+ }
+ }
+ });
+
+ // 删除 t_data_align_note_publish_count_temp_日期_分片序号
+ deleteRecordMapper.batchDeleteDataAlignNotePublishCountTempTable(tableNameSuffix, userIds);
+
+ processedTotal += userIds.size();
+ }
+ XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户笔记发布数进行对齐,共对齐记录数:{}", processedTotal);
+ }
+}
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/UserCollectCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/UserCollectCountShardingXxlJob.java
new file mode 100644
index 0000000..8f2a569
--- /dev/null
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/UserCollectCountShardingXxlJob.java
@@ -0,0 +1,89 @@
+package com.hanserwei.hannote.data.align.job;
+
+import cn.hutool.core.collection.CollUtil;
+import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
+import com.hanserwei.hannote.data.align.constant.TableConstants;
+import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+@Component
+@Slf4j
+public class UserCollectCountShardingXxlJob {
+
+ @Resource
+ private SelectRecordMapper selectRecordMapper;
+ @Resource
+ private UpdateRecordMapper updateRecordMapper;
+ @Resource
+ private DeleteRecordMapper deleteRecordMapper;
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @XxlJob("userCollectCountShardingJobHandler")
+ public void userCollectCountShardingJobHandler() throws Exception {
+ // 获取分片参数
+ // 分片序号
+ int shardIndex = XxlJobHelper.getShardIndex();
+ // 分片总数
+ int shardTotal = XxlJobHelper.getShardTotal();
+
+ XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户收藏数进行对齐");
+ XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ // 表后缀
+ String date = LocalDate.now().minusDays(1) // 昨日的日期
+ .format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
+ // 表名后缀
+ String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
+
+ // 一批次 1000 条
+ int batchSize = 1000;
+ // 共对齐了多少条记录,默认为 0
+ int processedTotal = 0;
+
+ while (true) {
+ // 分批次查询t_data_align_user_collect_count_temp_日期_分片序号,如一批次查询 1000 条,直到全部查询完成
+ List userIds = selectRecordMapper.selectBatchFromDataAlignUserCollectCountTempTable(tableNameSuffix, batchSize);
+
+ // 若记录为空,则结束循环
+ if (CollUtil.isEmpty(userIds)) {
+ break;
+ }
+
+ userIds.forEach(userId -> {
+ //2: 对 t_user_collection 用户收藏表执行 count(*) 操作,获取用户获得的收藏总数
+ int userCollectTotal = selectRecordMapper.selectUserCollectCountFromNoteCollectionTableByUserId(userId);
+
+ // 3: 更新 t_user_count 用户计数表
+ int count = updateRecordMapper.updateUserCollectTotalByUserId(userId, userCollectTotal);
+
+ if (count > 0) {
+ String redisKey = RedisKeyConstants.buildCountUserKey(userId);
+ boolean hashKey = redisTemplate.hasKey(redisKey);
+ if (hashKey) {
+ redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_COLLECT_TOTAL, userCollectTotal);
+ }
+ }
+ });
+
+ // 4: 删除 t_data_align_user_collect_count_temp_日期_分片序号
+ deleteRecordMapper.batchDeleteDataAlignUserCollectCountTempTable(tableNameSuffix, userIds);
+
+ processedTotal += userIds.size();
+ }
+ XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户收藏数进行对齐,共处理 {} 条记录", processedTotal);
+ }
+}
diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/UserLikeCountShardingXxlJob.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/UserLikeCountShardingXxlJob.java
new file mode 100644
index 0000000..e487cc7
--- /dev/null
+++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/job/UserLikeCountShardingXxlJob.java
@@ -0,0 +1,88 @@
+package com.hanserwei.hannote.data.align.job;
+
+import cn.hutool.core.collection.CollUtil;
+import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
+import com.hanserwei.hannote.data.align.constant.TableConstants;
+import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
+import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+@Component
+@Slf4j
+public class UserLikeCountShardingXxlJob {
+
+ @Resource
+ private SelectRecordMapper selectRecordMapper;
+ @Resource
+ private UpdateRecordMapper updateRecordMapper;
+ @Resource
+ private DeleteRecordMapper deleteRecordMapper;
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @XxlJob("userLikeCountShardingJobHandler")
+ public void userLikeCountShardingJobHandler() throws Exception {
+ // 获取分片参数
+ // 分片序号
+ int shardIndex = XxlJobHelper.getShardIndex();
+ // 分片总数
+ int shardTotal = XxlJobHelper.getShardTotal();
+
+ XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户点赞数进行对齐");
+ XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
+
+ // 表后缀
+ String date = LocalDate.now().minusDays(1) // 昨日的日期
+ .format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
+ // 表名后缀
+ String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
+
+ // 一批次 1000 条
+ int batchSize = 1000;
+ // 共对齐了多少条记录,默认为 0
+ int processedTotal = 0;
+
+ while (true) {
+ // 分批次查询t_data_align_user_like_count_temp_日期_分片序号,如一批次查询 1000 条,直到全部查询完成
+ List userIds = selectRecordMapper.selectBatchFromDataAlignUserLikeCountTempTable(tableNameSuffix, batchSize);
+
+ if (CollUtil.isEmpty(userIds)) {
+ break;
+ }
+
+ userIds.forEach(userId -> {
+ // 2: 对 t_user_like 用户点赞表执行 count(*) 操作,获取用户获得的点赞总数
+ int userLikeTotal = selectRecordMapper.selectUserLikeCountFromNoteLikeTableByUserId(userId);
+
+ // 3: 更新 t_user_count 用户表,将用户点赞总数更新到 t_user_count 表中
+ int count = updateRecordMapper.updateUserLikeTotalByUserId(userId, userLikeTotal);
+
+ if (count > 0) {
+ String redisKey = RedisKeyConstants.buildCountUserKey(userId);
+ boolean hashKey = redisTemplate.hasKey(redisKey);
+ if (hashKey) {
+ redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, userLikeTotal);
+ }
+ }
+ });
+
+ // 4: 删除 t_data_align_user_like_count_temp_日期_分片序号
+ deleteRecordMapper.batchDeleteDataAlignUserLikeCountTempTable(tableNameSuffix, userIds);
+
+ processedTotal += userIds.size();
+ }
+ XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户点赞数进行对齐,共对齐记录数:{}", processedTotal);
+ }
+}
diff --git a/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml
index 121054b..6008db7 100644
--- a/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml
+++ b/han-note-data-align/src/main/resources/mapperxml/DeleteRecordMapper.xml
@@ -9,4 +9,58 @@
#{userId}
+
+
+ delete
+ from `t_data_align_fans_count_temp_${tableNameSuffix}`
+ where user_id in
+
+ #{userId}
+
+
+
+
+ delete
+ from `t_data_align_note_publish_count_temp_${tableNameSuffix}`
+ where user_id in
+
+ #{userId}
+
+
+
+
+ delete
+ from `t_data_align_note_like_count_temp_${tableNameSuffix}`
+ where note_id in
+
+ #{noteId}
+
+
+
+
+ delete
+ from `t_data_align_note_collect_count_temp_${tableNameSuffix}`
+ where note_id in
+
+ #{noteId}
+
+
+
+
+ delete
+ from `t_data_align_user_collect_count_temp_${tableNameSuffix}`
+ where user_id in
+
+ #{userId}
+
+
+
+
+ delete
+ from `t_data_align_user_like_count_temp_${tableNameSuffix}`
+ where user_id in
+
+ #{userId}
+
+
\ No newline at end of file
diff --git a/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml
index fd8741d..8c04f4e 100644
--- a/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml
+++ b/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml
@@ -13,4 +13,88 @@
from t_following
where user_id = #{userId}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml
index 817d082..536d994 100644
--- a/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml
+++ b/han-note-data-align/src/main/resources/mapperxml/UpdateRecordMapper.xml
@@ -6,4 +6,40 @@
set following_total = #{followingTotal}
where user_id = #{userId}
+
+
+ update t_user_count
+ set fans_total = #{fansTotal}
+ where user_id = #{userId}
+
+
+
+ update t_user_count
+ set collect_total = #{userCollectTotal}
+ where user_id = #{userId}
+
+
+
+ update t_user_count
+ set note_total = #{notePublishTotal}
+ where user_id = #{userId}
+
+
+
+ update t_note_count
+ set like_total = #{noteLikeTotal}
+ where note_id = #{noteId}
+
+
+
+ update t_note_count
+ set collect_total = #{noteCollectTotal}
+ where note_id = #{noteId}
+
+
+
+ update t_user_count
+ set like_total = #{userLikeTotal}
+ where user_id = #{userId}
+
\ No newline at end of file