feat(relation): 实现关注列表分页查询及异步同步到Redis

- 在 PageResponse 中新增 getOffset 方法用于计算分页偏移量
- 优化关注列表分页逻辑,支持从 Redis 和数据库双重查询
- 添加线程池配置,用于异步同步关注列表至 Redis
- 实现全量同步关注列表到 Redis 的方法,并设置随机过期时间
- 封装 RPC 调用用户服务并将 DTO 转换为 VO 的公共方法
-修复分页查询边界条件判断,避免无效查询
- 使用 Lua 脚本批量操作 Redis 提高同步效率和原子性
This commit is contained in:
2025-10-14 23:31:25 +08:00
parent 1e350a4af5
commit aca7c657fa
3 changed files with 156 additions and 21 deletions

View File

@@ -0,0 +1,37 @@
package com.hanserwei.hannote.user.relation.biz.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
@Bean(name = "relationTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 最大线程数
executor.setMaxPoolSize(50);
// 队列容量
executor.setQueueCapacity(200);
// 线程活跃时间(秒)
executor.setKeepAliveSeconds(30);
// 线程名前缀
executor.setThreadNamePrefix("UserExecutor-");
// 拒绝策略:由调用线程处理(一般为主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}

View File

@@ -3,6 +3,7 @@ package com.hanserwei.hannote.user.relation.biz.service.impl;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.hanserwei.framework.biz.context.holder.LoginUserContextHolder; import com.hanserwei.framework.biz.context.holder.LoginUserContextHolder;
import com.hanserwei.framework.common.exception.ApiException; import com.hanserwei.framework.common.exception.ApiException;
import com.hanserwei.framework.common.response.PageResponse; import com.hanserwei.framework.common.response.PageResponse;
@@ -34,6 +35,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -55,6 +57,8 @@ public class RelationServiceImpl implements RelationService {
private FollowingDOService followingDOService; private FollowingDOService followingDOService;
@Resource @Resource
private RocketMQTemplate rocketMQTemplate; private RocketMQTemplate rocketMQTemplate;
@Resource(name = "relationTaskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public Response<?> follow(FollowUserReqVO followUserReqVO) { public Response<?> follow(FollowUserReqVO followUserReqVO) {
@@ -280,14 +284,13 @@ public class RelationServiceImpl implements RelationService {
Long total = redisTemplate.opsForZSet().zCard(followingRedisKey); Long total = redisTemplate.opsForZSet().zCard(followingRedisKey);
log.info("==> 查询目标用户的关注列表ZSet的总大小{}", total); log.info("==> 查询目标用户的关注列表ZSet的总大小{}", total);
// 构建回参 // 构建回参
List<FindFollowingUserRspVO> findFollowingUserRspVOS = null; List<FindFollowingUserRspVO> findFollowingUserRspVOS = null;
if (total != null) { //每页展示10条数据
//缓存有数据 long limit = 10L;
//每页展示10条数据 if (total != null && total > 0) {
long limit = 10L; // 缓存有数据
// 计算一共多少页 // 计算一共多少页
long totalPage = PageResponse.getTotalPage(total, limit); long totalPage = PageResponse.getTotalPage(total, limit);
@@ -299,7 +302,7 @@ public class RelationServiceImpl implements RelationService {
// 准备从ZSet中查询分页数据 // 准备从ZSet中查询分页数据
// 每页展示10条数据计算偏移量 // 每页展示10条数据计算偏移量
long offset = (pageNo - 1) * limit; long offset = PageResponse.getOffset(pageNo, limit);
// 使用 ZREVRANGEBYSCORE 命令按 score 降序获取元素,同时使用 LIMIT 子句实现分页 // 使用 ZREVRANGEBYSCORE 命令按 score 降序获取元素,同时使用 LIMIT 子句实现分页
// 注意:这里使用了 Double.POSITIVE_INFINITY 和 Double.NEGATIVE_INFINITY 作为分数范围 // 注意:这里使用了 Double.POSITIVE_INFINITY 和 Double.NEGATIVE_INFINITY 作为分数范围
@@ -318,31 +321,112 @@ public class RelationServiceImpl implements RelationService {
log.info("==> 批量查询用户信息用户ID: {}", userIds); log.info("==> 批量查询用户信息用户ID: {}", userIds);
// RPC: 批量查询用户信息 // RPC: 批量查询用户信息
List<FindUserByIdRspDTO> findUserByIdRspDTOS = userRpcService.findByIds(userIds); //noinspection ConstantValue
findFollowingUserRspVOS = rpcUserServiceAndDTO2VO(userIds, findFollowingUserRspVOS);
}
} else {
// 若 Redis 中没有数据,则从数据库查询
// 先查询记录总量
long count = followingDOService.count(new LambdaQueryWrapper<>(FollowingDO.class)
.eq(FollowingDO::getUserId, userId));
// 计算一共多少页
log.info("==> 批量查询用户信息,结果: {}", findUserByIdRspDTOS); long totalPage = PageResponse.getTotalPage(count, limit);
// 若不为空则则DTO转换为VO // 请求页码超过总页数
if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) { if (pageNo > totalPage) {
findFollowingUserRspVOS = findUserByIdRspDTOS.stream().map(findUserByIdRspDTO -> FindFollowingUserRspVO.builder() log.info("==> 批量查询用户信息,返回空数据");
.userId(findUserByIdRspDTO.getId()) return PageResponse.success(null, pageNo, total);
.introduction(findUserByIdRspDTO.getIntroduction()) }
.nickname(findUserByIdRspDTO.getNickName())
.avatar(findUserByIdRspDTO.getAvatar()) // 偏移量
.build()).toList(); long offset = PageResponse.getOffset(pageNo, limit);
}
}else { // 分页查询
// TODO: 若 Redis 中没有数据,则从数据库查询 // 从数据库分页查询
Page<FollowingDO> page = followingDOService.page(new Page<>(offset / limit + 1, limit),
new LambdaQueryWrapper<FollowingDO>()
.eq(FollowingDO::getUserId, userId)
.orderByDesc(FollowingDO::getCreateTime));
List<FollowingDO> followingDOS = page.getRecords();
// 赋值真实地记录总数
total = count;
// 若记录不为空
if (CollUtil.isNotEmpty(followingDOS)) {
// 提取所有关注用户 ID 到集合中
List<Long> userIds = followingDOS.stream().map(FollowingDO::getFollowingUserId).toList();
// RPC: 调用用户服务,并将 DTO 转换为 VO
//noinspection ConstantValue
findFollowingUserRspVOS = rpcUserServiceAndDTO2VO(userIds, findFollowingUserRspVOS);
// 异步将关注列表全量同步到 Redis
taskExecutor.submit(() -> syncFollowingList2Redis(userId));
// TODO: 异步将关注列表全量同步到 Redis
} }
} }
//noinspection DataFlowIssue
return PageResponse.success(findFollowingUserRspVOS, return PageResponse.success(findFollowingUserRspVOS,
pageNo, pageNo,
total); total);
} }
/**
* 全量同步关注列表到 Redis
*
* @param userId 用户ID
*/
private void syncFollowingList2Redis(Long userId) {
Page<FollowingDO> page = followingDOService.page(new Page<>(1, 1000),
new LambdaQueryWrapper<>(FollowingDO.class)
.select(FollowingDO::getFollowingUserId, FollowingDO::getCreateTime)
.eq(FollowingDO::getUserId, userId));
List<FollowingDO> followingDOS = page.getRecords();
log.info("==> 全量同步用户关注列表{}", JsonUtils.toJsonString(followingDOS));
if (CollUtil.isNotEmpty(followingDOS)) {
// 用户关注列表 Redis Key
String followingListRedisKey = RedisKeyConstants.buildUserFollowingKey(userId);
// 随机过期时间
// 保底1天+随机秒数
long expireSeconds = 60 * 60 * 24 + RandomUtil.randomInt(60 * 60 * 24);
// 构建 Lua 参数
Object[] luaArgs = buildLuaArgs(followingDOS, expireSeconds);
// 执行 Lua 脚本,批量同步关注关系数据到 Redis 中
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/follow_batch_add_and_expire.lua")));
script.setResultType(Long.class);
redisTemplate.execute(script, Collections.singletonList(followingListRedisKey), luaArgs);
log.info("==> 全量同步用户关注列表到 Redis用户ID: {}", userId);
}
}
/**
* RPC: 调用用户服务,并将 DTO 转换为 VO
*
* @param userIds 用户 ID 列表
* @param findFollowingUserRspVOS 跟随用户列表
* @return 跟随用户列表
*/
private List<FindFollowingUserRspVO> rpcUserServiceAndDTO2VO(List<Long> userIds, List<FindFollowingUserRspVO> findFollowingUserRspVOS) {
// RPC: 批量查询用户信息
List<FindUserByIdRspDTO> findUserByIdRspDTOS = userRpcService.findByIds(userIds);
// 若不为空DTO 转 VO
if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) {
findFollowingUserRspVOS = findUserByIdRspDTOS.stream()
.map(dto -> FindFollowingUserRspVO.builder()
.userId(dto.getId())
.avatar(dto.getAvatar())
.nickname(dto.getNickName())
.introduction(dto.getIntroduction())
.build())
.toList();
}
return findFollowingUserRspVOS;
}
/** /**
* 校验 Lua 脚本结果,根据状态码抛出对应的业务异常 * 校验 Lua 脚本结果,根据状态码抛出对应的业务异常
* @param result Lua 脚本返回结果 * @param result Lua 脚本返回结果

View File

@@ -53,4 +53,18 @@ public class PageResponse<T> extends Response<List<T>> {
public static long getTotalPage(long totalCount, long pageSize) { public static long getTotalPage(long totalCount, long pageSize) {
return pageSize == 0 ? 0 : (totalCount + pageSize - 1) / pageSize; return pageSize == 0 ? 0 : (totalCount + pageSize - 1) / pageSize;
} }
/**
* 计算分页查询的 offset
* @param pageNo 页码
* @param pageSize 每页展示的数据量
* @return offset
*/
public static long getOffset(long pageNo, long pageSize) {
// 如果页码小于 1默认返回第一页的 offset
if (pageNo < 1) {
pageNo = 1;
}
return (pageNo - 1) * pageSize;
}
} }