feat(data-align): 新增多种计数对齐任务及批量删除支持

- 新增粉丝数、笔记发布数、笔记点赞数、笔记收藏数、用户收藏数、用户点赞数等计数对齐任务
- 扩展 DeleteRecordMapper 支持多种计数变更表的批量删除操作
- 新增 SelectRecordMapper 查询方法支持各类计数表数据批量获取- 新增 UpdateRecordMapper 更新方法支持多维度计数表更新
- 完善 Redis 缓存更新逻辑,支持用户和笔记维度的计数缓存同步
- 添加对应的 XML 映射文件 SQL 实现,支持分片表结构动态拼接- 优化计数对齐任务处理流程,提升数据一致性保障能力
This commit is contained in:
2025-10-25 23:21:37 +08:00
parent d1f756d5c8
commit 6cc5c06879
14 changed files with 932 additions and 5 deletions

View File

@@ -42,10 +42,42 @@ public class RedisKeyConstants {
*/
public static final String FIELD_FOLLOWING_TOTAL = "followingTotal";
/**
* Hash Field: 粉丝总数
*/
public static final String FIELD_FANS_TOTAL = "fansTotal";
/**
* Hash Field: 笔记发布总数
*/
public static final String FIELD_NOTE_TOTAL = "noteTotal";
/**
* 用户维度计数 Key 前缀
*/
private static final String COUNT_USER_KEY_PREFIX = "count:user:";
/**
* Hash Field: 笔记点赞总数
*/
public static final String FIELD_LIKE_TOTAL = "likeTotal";
/**
* Hash Field: 笔记收藏总数
*/
public static final String FIELD_COLLECT_TOTAL = "collectTotal";
/**
* 笔记维度计数 Key 前缀
*/
private static final String COUNT_NOTE_KEY_PREFIX = "count:note:";
/**
* 构建笔记维度计数 Key
*
* @param noteId 笔记 ID
* @return 笔记维度计数 Key
*/
public static String buildCountNoteKey(Long noteId) {
return COUNT_NOTE_KEY_PREFIX + noteId;
}
/**
* 构建用户维度计数 Key

View File

@@ -8,9 +8,40 @@ public interface DeleteRecordMapper {
/**
* 日增量表:关注数计数变更 - 批量删除
*
* @param userIds 用户 ID
*/
void batchDeleteDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("userIds") List<Long> userIds);
/**
* 日增量表:粉丝数计数变更 - 批量删除
*/
void batchDeleteDataAlignFansCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("userIds") List<Long> userIds);
/**
* 日增量表:笔记发布数计数变更 - 批量删除
*/
void batchDeleteDataAlignNotePublishCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("userIds") List<Long> userIds);
/**
* 日增量表:笔记点赞计数变更 - 批量删除
*/
void batchDeleteDataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("noteIds") List<Long> noteIds);
void batchDeleteDataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("noteIds") List<Long> noteIds);
/**
* 日增量表:笔记收藏计数变更 - 批量删除
*/
void batchDeleteDataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("userIds") List<Long> userIds);
/**
* 日增量表:用户点赞计数变更 - 批量删除
*/
void batchDeleteDataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("userIds") List<Long> userIds);
}

View File

@@ -20,6 +20,24 @@ public interface SelectRecordMapper {
List<Long> selectBatchFromDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 查询 t_fans 粉丝表,获取粉丝总数
*
* @param userId 用户 ID
* @return 粉丝总数
*/
int selectCountFromFansTableByUserId(long userId);
/**
* 日增量表:粉丝数计数变更 - 批量查询
*
* @param tableNameSuffix 表名后缀
* @param batchSize 批量大小
* @return 批量查询结果
*/
List<Long> selectBatchFromDataAlignFansCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 查询 t_following 关注表,获取关注总数
*
@@ -27,4 +45,96 @@ public interface SelectRecordMapper {
* @return 关注总数
*/
int selectCountFromFollowingTableByUserId(long userId);
/**
* 日增量表:笔记发布数变更 - 批量查询
*
* @param tableNameSuffix 表名后缀
* @param batchSize 批量大小
* @return 批量查询结果
*/
List<Long> selectBatchFromDataAlignNotePublishCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 批量查询 t_note 笔记表,获取发布总数
*
* @param userId 用户 ID
* @return 发布总数
*/
int selectCountFromNoteTableByUserId(long userId);
/**
* 日增量表:笔记点赞数变更 - 批量查询
*
* @param tableNameSuffix 表名后缀
* @param batchSize 批量大小
* @return 批量查询结果
*/
List<Long> selectBatchFromDataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 查询 t_note_like 笔记点赞表,获取点赞总数
*
* @param noteId 笔记 ID
* @return 点赞总数
*/
int selectCountFromNoteLikeTableByNoteId(long noteId);
/**
* 日增量表:笔记收藏数变更 - 批量查询
*
* @param tableNameSuffix 表名后缀
* @param batchSize 批量大小
* @return 批量查询结果
*/
List<Long> selectBatchFromDataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 批量查询 t_note_collection 笔记收藏表,获取收藏总数
*
* @param noteId 笔记 ID
* @return 收藏总数
*/
int selectCountFromNoteCollectTableByNoteId(long noteId);
/**
* 日增量表:用户收藏数变更 - 批量查询
*
* @param tableNameSuffix 表名后缀
* @param batchSize 批量大小
* @return 批量查询结果
*/
List<Long> selectBatchFromDataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 批量查询 t_note_collection 笔记收藏表,获取收藏总数
*
* @param userId 用户 ID
* @return 收藏总数
*/
int selectUserCollectCountFromNoteCollectionTableByUserId(Long userId);
/**
* 日增量表:用户点赞数变更 - 批量查询
*
* @param tableNameSuffix 表名后缀
* @param batchSize 批量大小
* @return 批量查询结果
*/
List<Long> selectBatchFromDataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
@Param("batchSize") int batchSize);
/**
* 批量查询 t_note_like 笔记点赞表,获取点赞总数
*
* @param userId 用户 ID
* @return 点赞总数
*/
int selectUserLikeCountFromNoteLikeTableByUserId(Long userId);
}

View File

@@ -6,10 +6,44 @@ public interface UpdateRecordMapper {
/**
* 更新 t_user_count 计数表总关注数
*
* @param userId 用户 ID
* @return 更新行数
*/
int updateUserFollowingTotalByUserId(@Param("userId") long userId,
@Param("followingTotal") int followingTotal);
/**
* 更新 t_user_count 计数表总粉丝数
*/
int updateUserFansTotalByUserId(@Param("userId") long userId,
@Param("fansTotal") int fansTotal);
/**
* 更新 t_user_count 计数表总笔记数
*/
int updateUserNotePublishTotalByUserId(@Param("userId") Long userId,
@Param("notePublishTotal") int notePublishTotal);
/**
* 更新 t_note_count 计数表笔记点赞数
*/
int updateNoteLikeTotalByNoteId(@Param("noteId") long noteId,
@Param("noteLikeTotal") int noteLikeTotal);
/**
* 更新 t_note_count 计数表笔记收藏数
*/
int updateNoteCollectTotalByNoteId(@Param("noteId") long noteId,
@Param("noteCollectTotal") int noteCollectTotal);
/**
* 更新 t_user_count 计数表总收藏数
*/
int updateUserCollectTotalByUserId(@Param("userId") Long userId,
@Param("userCollectTotal") int userCollectTotal);
/**
* 更新 t_user_count 计数表总点赞数
*/
int updateUserLikeTotalByUserId(@Param("userId") Long userId,
@Param("userLikeTotal") int userLikeTotal);
}

View File

@@ -0,0 +1,95 @@
package com.hanserwei.hannote.data.align.job;
import cn.hutool.core.collection.CollUtil;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Component
@Slf4j
public class FansCountShardingXxlJob {
@Resource
private SelectRecordMapper selectRecordMapper;
@Resource
private UpdateRecordMapper updateRecordMapper;
@Resource
private DeleteRecordMapper deleteRecordMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 分片广播任务
*/
@XxlJob("fansCountShardingJobHandler")
public void fansCountShardingJobHandler() {
// 获取分片参数
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户粉丝数进行对齐");
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 表后缀
String date = LocalDate.now().minusDays(1) // 昨日的日期
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
// 表名后缀
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
// 一批次 1000 条
int batchSize = 1000;
// 共对齐了多少条记录,默认为 0
int processedTotal = 0;
while (true) {
// 分批次查询t_data_align_fans_count_temp_日期_分片序号如一批次查询 1000 条,直到全部查询完成
List<Long> userIds = selectRecordMapper.selectBatchFromDataAlignFansCountTempTable(tableNameSuffix, batchSize);
// 若记录为空,则结束循环
if (CollUtil.isEmpty(userIds)) {
break;
}
// 循环这一批发生变化的用户ID
userIds.forEach(userId -> {
// 1. 对t_fans进行count(*)操作获取该用户ID的粉丝总数
int fansTotal = selectRecordMapper.selectCountFromFansTableByUserId(userId);
// 2. 更新t_user_count的粉丝总数
int count = updateRecordMapper.updateUserFansTotalByUserId(userId, fansTotal);
// 更新Redis缓存
if (count > 0) {
String redisKey = RedisKeyConstants.buildCountUserKey(userId);
boolean hashKey = redisTemplate.hasKey(redisKey);
if (hashKey) {
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, fansTotal);
}
}
});
// 删除t_data_align_fans_count_temp_日期_分片序号中的记录
deleteRecordMapper.batchDeleteDataAlignFansCountTempTable(tableNameSuffix, userIds);
// 当前已处理的记录数
processedTotal += userIds.size();
}
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户粉丝数进行对齐,共处理 {} 条记录", processedTotal);
}
}

View File

@@ -0,0 +1,94 @@
package com.hanserwei.hannote.data.align.job;
import cn.hutool.core.collection.CollUtil;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Component
@Slf4j
public class NoteCollectCountShardingXxlJob {
@Resource
private SelectRecordMapper selectRecordMapper;
@Resource
private UpdateRecordMapper updateRecordMapper;
@Resource
private DeleteRecordMapper deleteRecordMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@XxlJob("noteCollectCountShardingJobHandler")
public void noteCollectCountShardingJobHandler() throws Exception {
// 获取分片参数
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的笔记收藏数进行对齐");
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 表后缀
String date = LocalDate.now().minusDays(1)
.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// 表名后缀
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
// 一批次1000条
int batchSize = 1000;
// 共对齐多少条数据默认为0
int processedTotal = 0;
while (true) {
// 1. 分批次查询 t_data_align_note_collect_count_temp_日期_分片序号如一批次查询 1000 条,直到全部查询完成
List<Long> noteIds = selectRecordMapper.selectBatchFromDataAlignNoteCollectCountTempTable(tableNameSuffix, batchSize);
// 若查询结果为空,则跳出循环
if (CollUtil.isEmpty(noteIds)) {
break;
}
noteIds.forEach(noteId -> {
int likeTotal = selectRecordMapper.selectCountFromNoteCollectTableByNoteId(noteId);
// 3: 更新 t_note_count 表, 更新对应 Redis 缓存
int count = updateRecordMapper.updateNoteCollectTotalByNoteId(noteId, likeTotal);
if (count > 0) {
String redisKey = RedisKeyConstants.buildCountNoteKey(noteId);
// 判断 Hash 是否存在
boolean hashKey = redisTemplate.hasKey(redisKey);
// 若存在
if (hashKey) {
// 更新对应 Redis 缓存
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_COLLECT_TOTAL, likeTotal);
}
}
});
// 4. 批量物理删除这一批次记录
deleteRecordMapper.batchDeleteDataAlignNoteCollectCountTempTable(tableNameSuffix, noteIds);
processedTotal += noteIds.size();
}
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的笔记收藏数进行对齐,共对齐记录数:{}", processedTotal);
}
}

View File

@@ -0,0 +1,95 @@
package com.hanserwei.hannote.data.align.job;
import cn.hutool.core.collection.CollUtil;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Component
@Slf4j
public class NoteLikeCountShardingXxlJob {
@Resource
private SelectRecordMapper selectRecordMapper;
@Resource
private UpdateRecordMapper updateRecordMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private DeleteRecordMapper deleteRecordMapper;
@XxlJob("noteLikeCountShardingJobHandler")
public void noteLikeCountShardingJobHandler() throws Exception {
// 获取分片参数
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的笔记点赞数进行对齐");
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 表后缀
String date = LocalDate.now().minusDays(1)
.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// 表名后缀
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
// 一批次1000条
int batchSize = 1000;
// 共对齐多少条数据默认为0
int processedTotal = 0;
while (true) {
// 1. 分批次查询 t_data_align_note_like_count_temp_日期_分片序号如一批次查询 1000 条,直到全部查询完成
List<Long> noteIds = selectRecordMapper.selectBatchFromDataAlignNoteLikeCountTempTable(tableNameSuffix, batchSize);
// 若记录为空,则结束循环
if (CollUtil.isEmpty(noteIds)) {
break;
}
// 循环这一批发生变更的笔记 ID
noteIds.forEach(noteId -> {
// 对 t_note_like 关注表执行 count(*) 操作,获取关注总数
int likeTotal = selectRecordMapper.selectCountFromNoteLikeTableByNoteId(noteId);
// 3: 更新 t_note_count 表, 更新对应 Redis 缓存
int count = updateRecordMapper.updateNoteLikeTotalByNoteId(noteId, likeTotal);
// 更新对应 Redis 缓存
if (count > 0) {
String redisKey = RedisKeyConstants.buildCountNoteKey(noteId);
// 判断 Hash 是否存在
boolean hashKey = redisTemplate.hasKey(redisKey);
// 若存在
if (hashKey) {
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, likeTotal);
}
}
});
// 4. 批量物理删除这一批次记录
deleteRecordMapper.batchDeleteDataAlignNoteLikeCountTempTable(tableNameSuffix, noteIds);
// 当前处理的记录数
processedTotal += noteIds.size();
}
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的笔记点赞数进行对齐,共对齐记录数:{}", processedTotal);
}
}

View File

@@ -0,0 +1,84 @@
package com.hanserwei.hannote.data.align.job;
import cn.hutool.core.collection.CollUtil;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Component
@Slf4j
public class NotePublishCountShardingXxlJob {
@Resource
private SelectRecordMapper selectRecordMapper;
@Resource
private UpdateRecordMapper updateRecordMapper;
@Resource
private DeleteRecordMapper deleteRecordMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@XxlJob("notePublishCountShardingJobHandler")
public void notePublishCountShardingJobHandler() throws Exception {
// 获取分片参数
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的笔记发布数进行对齐");
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 表后缀
String date = LocalDate.now().minusDays(1) // 昨日的日期
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
// 表名后缀
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
// 一批次 1000 条
int batchSize = 1000;
// 共对齐了多少条记录,默认为 0
int processedTotal = 0;
while (true) {
// 分批次查询t_data_align_note_publish_count_temp_日期_分片序号如一批次查询 1000 条,直到全部查询完成
List<Long> userIds = selectRecordMapper.selectBatchFromDataAlignNotePublishCountTempTable(tableNameSuffix, batchSize);
// 若记录为空,则结束循环
if (CollUtil.isEmpty(userIds)) {
break;
}
userIds.forEach(userId -> {
int notePublishTotal = selectRecordMapper.selectCountFromNoteTableByUserId(userId);
//3: 更新 t_user_count 表
int count = updateRecordMapper.updateUserNotePublishTotalByUserId(userId, notePublishTotal);
if (count > 0) {
String redisKey = RedisKeyConstants.buildCountUserKey(userId);
boolean hashKey = redisTemplate.hasKey(redisKey);
if (hashKey) {
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_NOTE_TOTAL, notePublishTotal);
}
}
});
// 删除 t_data_align_note_publish_count_temp_日期_分片序号
deleteRecordMapper.batchDeleteDataAlignNotePublishCountTempTable(tableNameSuffix, userIds);
processedTotal += userIds.size();
}
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户笔记发布数进行对齐,共对齐记录数:{}", processedTotal);
}
}

View File

@@ -0,0 +1,89 @@
package com.hanserwei.hannote.data.align.job;
import cn.hutool.core.collection.CollUtil;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Component
@Slf4j
public class UserCollectCountShardingXxlJob {
@Resource
private SelectRecordMapper selectRecordMapper;
@Resource
private UpdateRecordMapper updateRecordMapper;
@Resource
private DeleteRecordMapper deleteRecordMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@XxlJob("userCollectCountShardingJobHandler")
public void userCollectCountShardingJobHandler() throws Exception {
// 获取分片参数
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户收藏数进行对齐");
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 表后缀
String date = LocalDate.now().minusDays(1) // 昨日的日期
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
// 表名后缀
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
// 一批次 1000 条
int batchSize = 1000;
// 共对齐了多少条记录,默认为 0
int processedTotal = 0;
while (true) {
// 分批次查询t_data_align_user_collect_count_temp_日期_分片序号如一批次查询 1000 条,直到全部查询完成
List<Long> userIds = selectRecordMapper.selectBatchFromDataAlignUserCollectCountTempTable(tableNameSuffix, batchSize);
// 若记录为空,则结束循环
if (CollUtil.isEmpty(userIds)) {
break;
}
userIds.forEach(userId -> {
//2: 对 t_user_collection 用户收藏表执行 count(*) 操作,获取用户获得的收藏总数
int userCollectTotal = selectRecordMapper.selectUserCollectCountFromNoteCollectionTableByUserId(userId);
// 3: 更新 t_user_count 用户计数表
int count = updateRecordMapper.updateUserCollectTotalByUserId(userId, userCollectTotal);
if (count > 0) {
String redisKey = RedisKeyConstants.buildCountUserKey(userId);
boolean hashKey = redisTemplate.hasKey(redisKey);
if (hashKey) {
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_COLLECT_TOTAL, userCollectTotal);
}
}
});
// 4: 删除 t_data_align_user_collect_count_temp_日期_分片序号
deleteRecordMapper.batchDeleteDataAlignUserCollectCountTempTable(tableNameSuffix, userIds);
processedTotal += userIds.size();
}
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户收藏数进行对齐,共处理 {} 条记录", processedTotal);
}
}

View File

@@ -0,0 +1,88 @@
package com.hanserwei.hannote.data.align.job;
import cn.hutool.core.collection.CollUtil;
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
import com.hanserwei.hannote.data.align.constant.TableConstants;
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
@Component
@Slf4j
public class UserLikeCountShardingXxlJob {
@Resource
private SelectRecordMapper selectRecordMapper;
@Resource
private UpdateRecordMapper updateRecordMapper;
@Resource
private DeleteRecordMapper deleteRecordMapper;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@XxlJob("userLikeCountShardingJobHandler")
public void userLikeCountShardingJobHandler() throws Exception {
// 获取分片参数
// 分片序号
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户点赞数进行对齐");
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 表后缀
String date = LocalDate.now().minusDays(1) // 昨日的日期
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
// 表名后缀
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
// 一批次 1000 条
int batchSize = 1000;
// 共对齐了多少条记录,默认为 0
int processedTotal = 0;
while (true) {
// 分批次查询t_data_align_user_like_count_temp_日期_分片序号如一批次查询 1000 条,直到全部查询完成
List<Long> userIds = selectRecordMapper.selectBatchFromDataAlignUserLikeCountTempTable(tableNameSuffix, batchSize);
if (CollUtil.isEmpty(userIds)) {
break;
}
userIds.forEach(userId -> {
// 2: 对 t_user_like 用户点赞表执行 count(*) 操作,获取用户获得的点赞总数
int userLikeTotal = selectRecordMapper.selectUserLikeCountFromNoteLikeTableByUserId(userId);
// 3: 更新 t_user_count 用户表,将用户点赞总数更新到 t_user_count 表中
int count = updateRecordMapper.updateUserLikeTotalByUserId(userId, userLikeTotal);
if (count > 0) {
String redisKey = RedisKeyConstants.buildCountUserKey(userId);
boolean hashKey = redisTemplate.hasKey(redisKey);
if (hashKey) {
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, userLikeTotal);
}
}
});
// 4: 删除 t_data_align_user_like_count_temp_日期_分片序号
deleteRecordMapper.batchDeleteDataAlignUserLikeCountTempTable(tableNameSuffix, userIds);
processedTotal += userIds.size();
}
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户点赞数进行对齐,共对齐记录数:{}", processedTotal);
}
}

View File

@@ -9,4 +9,58 @@
#{userId}
</foreach>
</delete>
<delete id="batchDeleteDataAlignFansCountTempTable" parameterType="list">
delete
from `t_data_align_fans_count_temp_${tableNameSuffix}`
where user_id in
<foreach collection="userIds" open="(" item="userId" close=")" separator=",">
#{userId}
</foreach>
</delete>
<delete id="batchDeleteDataAlignNotePublishCountTempTable" parameterType="list">
delete
from `t_data_align_note_publish_count_temp_${tableNameSuffix}`
where user_id in
<foreach collection="userIds" open="(" item="userId" close=")" separator=",">
#{userId}
</foreach>
</delete>
<delete id="batchDeleteDataAlignNoteLikeCountTempTable" parameterType="list">
delete
from `t_data_align_note_like_count_temp_${tableNameSuffix}`
where note_id in
<foreach collection="noteIds" open="(" item="noteId" close=")" separator=",">
#{noteId}
</foreach>
</delete>
<delete id="batchDeleteDataAlignNoteCollectCountTempTable" parameterType="list">
delete
from `t_data_align_note_collect_count_temp_${tableNameSuffix}`
where note_id in
<foreach collection="noteIds" open="(" item="noteId" close=")" separator=",">
#{noteId}
</foreach>
</delete>
<delete id="batchDeleteDataAlignUserCollectCountTempTable" parameterType="map">
delete
from `t_data_align_user_collect_count_temp_${tableNameSuffix}`
where user_id in
<foreach collection="userIds" open="(" item="userId" close=")" separator=",">
#{userId}
</foreach>
</delete>
<delete id="batchDeleteDataAlignUserLikeCountTempTable" parameterType="map">
delete
from `t_data_align_user_like_count_temp_${tableNameSuffix}`
where user_id in
<foreach collection="userIds" open="(" item="userId" close=")" separator=",">
#{userId}
</foreach>
</delete>
</mapper>

View File

@@ -13,4 +13,88 @@
from t_following
where user_id = #{userId}
</select>
<select id="selectBatchFromDataAlignFansCountTempTable" resultType="long" parameterType="map">
select user_id
from `t_data_align_fans_count_temp_${tableNameSuffix}`
order by id
limit #{batchSize}
</select>
<select id="selectCountFromFansTableByUserId" parameterType="map" resultType="int">
select count(*)
from t_fans
where user_id = #{userId}
</select>
<select id="selectBatchFromDataAlignNotePublishCountTempTable" resultType="long" parameterType="map">
select user_id
from `t_data_align_note_publish_count_temp_${tableNameSuffix}`
order by id
limit #{batchSize}
</select>
<select id="selectCountFromNoteTableByUserId" parameterType="map" resultType="int">
select count(*)
from t_note
where creator_id = #{userId}
and status = 1
</select>
<select id="selectBatchFromDataAlignNoteLikeCountTempTable" resultType="long" parameterType="map">
select note_id
from `t_data_align_note_like_count_temp_${tableNameSuffix}`
order by id
limit #{batchSize}
</select>
<select id="selectCountFromNoteLikeTableByNoteId" parameterType="map" resultType="int">
select count(*)
from t_note_like
where note_id = #{noteId}
and status = 1
</select>
<select id="selectBatchFromDataAlignNoteCollectCountTempTable" resultType="long" parameterType="map">
select note_id
from `t_data_align_note_collect_count_temp_${tableNameSuffix}`
order by id
limit #{batchSize}
</select>
<select id="selectCountFromNoteCollectTableByNoteId" resultType="int" parameterType="map">
select count(*)
from t_note_collection
where note_id = #{noteId}
and status = 1
</select>
<select id="selectBatchFromDataAlignUserCollectCountTempTable" resultType="long" parameterType="map">
select user_id
from `t_data_align_user_collect_count_temp_${tableNameSuffix}`
order by id
limit #{batchSize}
</select>
<select id="selectUserCollectCountFromNoteCollectionTableByUserId" resultType="int" parameterType="map">
select count(*)
from t_note_collection
where user_id = #{userId}
and status = 1
</select>
<select id="selectBatchFromDataAlignUserLikeCountTempTable" resultType="long" parameterType="map">
select user_id
from `t_data_align_user_like_count_temp_${tableNameSuffix}`
order by id
limit #{batchSize}
</select>
<select id="selectUserLikeCountFromNoteLikeTableByUserId" resultType="int" parameterType="map">
select count(*)
from t_note_like
where user_id = #{userId}
and status = 1
</select>
</mapper>

View File

@@ -6,4 +6,40 @@
set following_total = #{followingTotal}
where user_id = #{userId}
</update>
<update id="updateUserFansTotalByUserId" parameterType="map">
update t_user_count
set fans_total = #{fansTotal}
where user_id = #{userId}
</update>
<update id="updateUserCollectTotalByUserId" parameterType="map">
update t_user_count
set collect_total = #{userCollectTotal}
where user_id = #{userId}
</update>
<update id="updateUserNotePublishTotalByUserId" parameterType="map">
update t_user_count
set note_total = #{notePublishTotal}
where user_id = #{userId}
</update>
<update id="updateNoteLikeTotalByNoteId" parameterType="map">
update t_note_count
set like_total = #{noteLikeTotal}
where note_id = #{noteId}
</update>
<update id="updateNoteCollectTotalByNoteId" parameterType="map">
update t_note_count
set collect_total = #{noteCollectTotal}
where note_id = #{noteId}
</update>
<update id="updateUserLikeTotalByUserId" parameterType="map">
update t_user_count
set like_total = #{userLikeTotal}
where user_id = #{userId}
</update>
</mapper>