feat(data-align): 实现用户关注数对齐分片任务
- 新增 DeleteRecordMapper 接口及 XML 配置,支持批量删除临时表记录 - 新增 SelectRecordMapper 接口及 XML 配置,支持分批查询和统计关注数 - 新增 UpdateRecordMapper 接口及 XML 配置,用于更新用户关注总数 - 新增 FollowingCountShardingXxlJob 任务类,实现分片广播处理关注数对齐逻辑 -重命名 InsertRecordMapper为 InsertMapper 并同步更新相关引用 - 在 RedisKeyConstants 中新增构建用户计数 Key 的方法及相关常量 - 修改多个消费者类中的 Mapper 引用名称以匹配重命名后的接口 - 更新数据源映射文件,调整 Mapper XML 文件路径配置
This commit is contained in:
3
.idea/data_source_mapping.xml
generated
3
.idea/data_source_mapping.xml
generated
@@ -3,7 +3,8 @@
|
|||||||
<component name="DataSourcePerFileMappings">
|
<component name="DataSourcePerFileMappings">
|
||||||
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/5b969fbe-0f66-42be-8d30-ff21036ab8a4/console.sql" value="5b969fbe-0f66-42be-8d30-ff21036ab8a4" />
|
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/5b969fbe-0f66-42be-8d30-ff21036ab8a4/console.sql" value="5b969fbe-0f66-42be-8d30-ff21036ab8a4" />
|
||||||
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/f2474a4a-e4f1-4afa-bd43-7ae7738b47c5/console.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/f2474a4a-e4f1-4afa-bd43-7ae7738b47c5/console.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml/InsertMapper.xml" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
|
<file url="file://$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml/SelectRecordMapper.xml" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/sql/createData.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$PROJECT_DIR$/sql/createData.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/sql/createTable.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$PROJECT_DIR$/sql/createTable.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/sql/leafcreatetable.sql" value="c4c1f1dc-816f-4113-88d6-9ebd7677af82" />
|
<file url="file://$PROJECT_DIR$/sql/leafcreatetable.sql" value="c4c1f1dc-816f-4113-88d6-9ebd7677af82" />
|
||||||
|
|||||||
@@ -27,6 +27,26 @@ public class RedisKeyConstants {
|
|||||||
*/
|
*/
|
||||||
public static final String BLOOM_TODAY_USER_FANS_LIST_KEY = "bloom:dataAlign:user:fans:";
|
public static final String BLOOM_TODAY_USER_FANS_LIST_KEY = "bloom:dataAlign:user:fans:";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hash Field: 关注总数
|
||||||
|
*/
|
||||||
|
public static final String FIELD_FOLLOWING_TOTAL = "followingTotal";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户维度计数 Key 前缀
|
||||||
|
*/
|
||||||
|
private static final String COUNT_USER_KEY_PREFIX = "count:user:";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建用户维度计数 Key
|
||||||
|
*
|
||||||
|
* @param userId 用户 ID
|
||||||
|
* @return 用户维度计数 Key
|
||||||
|
*/
|
||||||
|
public static String buildCountUserKey(Long userId) {
|
||||||
|
return COUNT_USER_KEY_PREFIX + userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils;
|
|||||||
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper;
|
||||||
import com.hanserwei.hannote.data.align.model.vo.CollectUnCollectNoteMqDTO;
|
import com.hanserwei.hannote.data.align.model.vo.CollectUnCollectNoteMqDTO;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -37,7 +37,7 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene
|
|||||||
@Resource
|
@Resource
|
||||||
private TransactionTemplate transactionTemplate;
|
private TransactionTemplate transactionTemplate;
|
||||||
@Resource
|
@Resource
|
||||||
private InsertRecordMapper insertRecordMapper;
|
private InsertMapper insertMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 表总分片数
|
* 表总分片数
|
||||||
@@ -91,8 +91,8 @@ public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListene
|
|||||||
// 将日增量变更数据,分别写入两张表
|
// 将日增量变更数据,分别写入两张表
|
||||||
// - t_data_align_note_collect_count_temp_日期_分片序号
|
// - t_data_align_note_collect_count_temp_日期_分片序号
|
||||||
// - t_data_align_user_collect_count_temp_日期_分片序号
|
// - t_data_align_user_collect_count_temp_日期_分片序号
|
||||||
insertRecordMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
insertMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||||
insertRecordMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
insertMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils;
|
|||||||
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper;
|
||||||
import com.hanserwei.hannote.data.align.model.vo.LikeUnlikeNoteMqDTO;
|
import com.hanserwei.hannote.data.align.model.vo.LikeUnlikeNoteMqDTO;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -37,7 +37,7 @@ public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<S
|
|||||||
@Resource
|
@Resource
|
||||||
private TransactionTemplate transactionTemplate;
|
private TransactionTemplate transactionTemplate;
|
||||||
@Resource
|
@Resource
|
||||||
private InsertRecordMapper insertRecordMapper;
|
private InsertMapper insertMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 表总分片数
|
* 表总分片数
|
||||||
@@ -90,8 +90,8 @@ public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<S
|
|||||||
// 将日增量变更数据,分别写入两张表
|
// 将日增量变更数据,分别写入两张表
|
||||||
// - t_data_align_note_like_count_temp_日期_分片序号
|
// - t_data_align_note_like_count_temp_日期_分片序号
|
||||||
// - t_data_align_user_like_count_temp_日期_分片序号
|
// - t_data_align_user_like_count_temp_日期_分片序号
|
||||||
insertRecordMapper.insert2DataAlignNoteLikeCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
insertMapper.insert2DataAlignNoteLikeCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||||
insertRecordMapper.insert2DataAlignUserLikeCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
insertMapper.insert2DataAlignUserLikeCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
status.setRollbackOnly();
|
status.setRollbackOnly();
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils;
|
|||||||
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper;
|
||||||
import com.hanserwei.hannote.data.align.model.vo.NoteOperateMqDTO;
|
import com.hanserwei.hannote.data.align.model.vo.NoteOperateMqDTO;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -35,7 +35,7 @@ public class TodayNotePublishIncrementData2DBConsumer implements RocketMQListene
|
|||||||
private RedisTemplate<String, Object> redisTemplate;
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private InsertRecordMapper insertRecordMapper;
|
private InsertMapper insertMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 表总分片数
|
* 表总分片数
|
||||||
@@ -81,7 +81,7 @@ public class TodayNotePublishIncrementData2DBConsumer implements RocketMQListene
|
|||||||
|
|
||||||
// 将日增量变更数据,写入日增量表中
|
// 将日增量变更数据,写入日增量表中
|
||||||
// - t_data_align_note_publish_count_temp_日期_分片序号
|
// - t_data_align_note_publish_count_temp_日期_分片序号
|
||||||
insertRecordMapper.insert2DataAlignUserNotePublishCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
insertMapper.insert2DataAlignUserNotePublishCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||||
|
|
||||||
// 3. 数据库写入成功后,再添加布隆过滤器中
|
// 3. 数据库写入成功后,再添加布隆过滤器中
|
||||||
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.hanserwei.framework.common.utils.JsonUtils;
|
|||||||
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertMapper;
|
||||||
import com.hanserwei.hannote.data.align.model.vo.FollowUnfollowMqDTO;
|
import com.hanserwei.hannote.data.align.model.vo.FollowUnfollowMqDTO;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -36,7 +36,7 @@ public class TodayUserFollowIncrementData2DBConsumer implements RocketMQListener
|
|||||||
private RedisTemplate<String, Object> redisTemplate;
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private InsertRecordMapper insertRecordMapper;
|
private InsertMapper insertMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 表总分片数
|
* 表总分片数
|
||||||
@@ -90,7 +90,7 @@ public class TodayUserFollowIncrementData2DBConsumer implements RocketMQListener
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// 将日增量变更数据,写入表 t_data_align_following_count_temp_日期_分片序号
|
// 将日增量变更数据,写入表 t_data_align_following_count_temp_日期_分片序号
|
||||||
insertRecordMapper.insert2DataAlignUserFollowingCountTempTable(
|
insertMapper.insert2DataAlignUserFollowingCountTempTable(
|
||||||
TableConstants.buildTableNameSuffix(date, userIdHashKey), userId);
|
TableConstants.buildTableNameSuffix(date, userIdHashKey), userId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("", e);
|
log.error("", e);
|
||||||
@@ -115,7 +115,7 @@ public class TodayUserFollowIncrementData2DBConsumer implements RocketMQListener
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// 将日增量变更数据,写入表 t_data_align_fans_count_temp_日期_分片序号
|
// 将日增量变更数据,写入表 t_data_align_fans_count_temp_日期_分片序号
|
||||||
insertRecordMapper.insert2DataAlignUserFansCountTempTable(
|
insertMapper.insert2DataAlignUserFansCountTempTable(
|
||||||
TableConstants.buildTableNameSuffix(date, targetUserIdHashKey), targetUserId);
|
TableConstants.buildTableNameSuffix(date, targetUserIdHashKey), targetUserId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("", e);
|
log.error("", e);
|
||||||
|
|||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.domain.mapper;
|
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface DeleteRecordMapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 日增量表:关注数计数变更 - 批量删除
|
||||||
|
*
|
||||||
|
* @param userIds 用户 ID
|
||||||
|
*/
|
||||||
|
void batchDeleteDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
|
||||||
|
@Param("userIds") List<Long> userIds);
|
||||||
|
}
|
||||||
@@ -5,7 +5,7 @@ import org.apache.ibatis.annotations.Param;
|
|||||||
/**
|
/**
|
||||||
* 添加记录
|
* 添加记录
|
||||||
*/
|
*/
|
||||||
public interface InsertRecordMapper {
|
public interface InsertMapper {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 笔记点赞数:计数变更
|
* 笔记点赞数:计数变更
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.domain.mapper;
|
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询
|
||||||
|
*/
|
||||||
|
public interface SelectRecordMapper {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 日增量表:关注数计数变更 - 批量查询
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
* @param batchSize 批量大小
|
||||||
|
* @return 批量查询结果
|
||||||
|
*/
|
||||||
|
List<Long> selectBatchFromDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix,
|
||||||
|
@Param("batchSize") int batchSize);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询 t_following 关注表,获取关注总数
|
||||||
|
*
|
||||||
|
* @param userId 用户 ID
|
||||||
|
* @return 关注总数
|
||||||
|
*/
|
||||||
|
int selectCountFromFollowingTableByUserId(long userId);
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.domain.mapper;
|
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
public interface UpdateRecordMapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新 t_user_count 计数表总关注数
|
||||||
|
*
|
||||||
|
* @param userId 用户 ID
|
||||||
|
* @return 更新行数
|
||||||
|
*/
|
||||||
|
int updateUserFollowingTotalByUserId(@Param("userId") long userId,
|
||||||
|
@Param("followingTotal") int followingTotal);
|
||||||
|
}
|
||||||
@@ -0,0 +1,101 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.job;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper;
|
||||||
|
import com.xxl.job.core.context.XxlJobHelper;
|
||||||
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class FollowingCountShardingXxlJob {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private SelectRecordMapper selectRecordMapper;
|
||||||
|
@Resource
|
||||||
|
private UpdateRecordMapper updateRecordMapper;
|
||||||
|
@Resource
|
||||||
|
private DeleteRecordMapper deleteRecordMapper;
|
||||||
|
@Resource
|
||||||
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 分片广播任务
|
||||||
|
*/
|
||||||
|
@XxlJob("followingCountShardingJobHandler")
|
||||||
|
public void followingCountShardingJobHandler() throws Exception {
|
||||||
|
// 获取分片参数
|
||||||
|
// 分片序号
|
||||||
|
int shardIndex = XxlJobHelper.getShardIndex();
|
||||||
|
// 分片总数
|
||||||
|
int shardTotal = XxlJobHelper.getShardTotal();
|
||||||
|
|
||||||
|
XxlJobHelper.log("=================> 开始定时分片广播任务:对当日发生变更的用户关注数进行对齐");
|
||||||
|
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
|
||||||
|
|
||||||
|
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
|
||||||
|
|
||||||
|
// 表后缀
|
||||||
|
String date = LocalDate.now().minusDays(1) // 昨日的日期
|
||||||
|
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
|
||||||
|
// 表名后缀
|
||||||
|
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, shardIndex);
|
||||||
|
|
||||||
|
// 一批次 1000 条
|
||||||
|
int batchSize = 1000;
|
||||||
|
// 共对齐了多少条记录,默认为 0
|
||||||
|
int processedTotal = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// 1. 分批次查询,如一次查询1000 条,直到查询完毕
|
||||||
|
List<Long> userIds = selectRecordMapper.selectBatchFromDataAlignFollowingCountTempTable(tableNameSuffix, batchSize);
|
||||||
|
|
||||||
|
// 若记录为空,则结束循环
|
||||||
|
if (CollUtil.isEmpty(userIds)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 循环这一批发生变更的用户 ID
|
||||||
|
userIds.forEach(userId -> {
|
||||||
|
// 2: 对 t_following 关注表执行 count(*) 操作,获取关注总数
|
||||||
|
int followingTotal = selectRecordMapper.selectCountFromFollowingTableByUserId(userId);
|
||||||
|
|
||||||
|
// 3: 更新 t_user_count 表
|
||||||
|
int count = updateRecordMapper.updateUserFollowingTotalByUserId(userId, followingTotal);
|
||||||
|
// 更新对应 Redis 缓存
|
||||||
|
if (count > 0) {
|
||||||
|
String redisKey = RedisKeyConstants.buildCountUserKey(userId);
|
||||||
|
// 判断 Hash 是否存在
|
||||||
|
boolean hashKey = redisTemplate.hasKey(redisKey);
|
||||||
|
// 若存在
|
||||||
|
if (hashKey) {
|
||||||
|
// 更新 Hash 中的 Field 关注总数
|
||||||
|
redisTemplate.opsForHash().put(redisKey, RedisKeyConstants.FIELD_FOLLOWING_TOTAL, followingTotal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
// 4. 批量物理删除这一批次记录
|
||||||
|
|
||||||
|
deleteRecordMapper.batchDeleteDataAlignFollowingCountTempTable(tableNameSuffix, userIds);
|
||||||
|
|
||||||
|
// 当前已处理的记录数
|
||||||
|
processedTotal += userIds.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
XxlJobHelper.log("=================> 结束定时分片广播任务:对当日发生变更的用户关注数进行对齐,共对齐记录数:{}", processedTotal);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.DeleteRecordMapper">
|
||||||
|
<delete id="batchDeleteDataAlignFollowingCountTempTable" parameterType="list">
|
||||||
|
delete
|
||||||
|
from `t_data_align_following_count_temp_${tableNameSuffix}`
|
||||||
|
where user_id in
|
||||||
|
<foreach collection="userIds" open="(" item="userId" close=")" separator=",">
|
||||||
|
#{userId}
|
||||||
|
</foreach>
|
||||||
|
</delete>
|
||||||
|
</mapper>
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8" ?>
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper">
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.InsertMapper">
|
||||||
<insert id="insert2DataAlignNoteLikeCountTempTable" parameterType="map">
|
<insert id="insert2DataAlignNoteLikeCountTempTable" parameterType="map">
|
||||||
insert into `t_data_align_note_like_count_temp_${tableNameSuffix}` (note_id)
|
insert into `t_data_align_note_like_count_temp_${tableNameSuffix}` (note_id)
|
||||||
values (#{noteId})
|
values (#{noteId})
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.SelectRecordMapper">
|
||||||
|
<select id="selectBatchFromDataAlignFollowingCountTempTable" resultType="long" parameterType="map">
|
||||||
|
select user_id
|
||||||
|
from `t_data_align_following_count_temp_${tableNameSuffix}`
|
||||||
|
order by id
|
||||||
|
limit #{batchSize}
|
||||||
|
</select>
|
||||||
|
|
||||||
|
<select id="selectCountFromFollowingTableByUserId" parameterType="map" resultType="int">
|
||||||
|
select count(*)
|
||||||
|
from t_following
|
||||||
|
where user_id = #{userId}
|
||||||
|
</select>
|
||||||
|
</mapper>
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.UpdateRecordMapper">
|
||||||
|
<update id="updateUserFollowingTotalByUserId" parameterType="map">
|
||||||
|
update t_user_count
|
||||||
|
set following_total = #{followingTotal}
|
||||||
|
where user_id = #{userId}
|
||||||
|
</update>
|
||||||
|
</mapper>
|
||||||
Reference in New Issue
Block a user