diff --git a/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/config/ThreadPoolConfig.java b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/config/ThreadPoolConfig.java new file mode 100644 index 0000000..fd17cd0 --- /dev/null +++ b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/config/ThreadPoolConfig.java @@ -0,0 +1,37 @@ +package com.hanserwei.hannote.user.relation.biz.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +public class ThreadPoolConfig { + + @Bean(name = "relationTaskExecutor") + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(10); + // 最大线程数 + executor.setMaxPoolSize(50); + // 队列容量 + executor.setQueueCapacity(200); + // 线程活跃时间(秒) + executor.setKeepAliveSeconds(30); + // 线程名前缀 + executor.setThreadNamePrefix("UserExecutor-"); + + // 拒绝策略:由调用线程处理(一般为主线程) + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 等待所有任务结束后再关闭线程池 + executor.setWaitForTasksToCompleteOnShutdown(true); + // 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞 + executor.setAwaitTerminationSeconds(60); + + executor.initialize(); + return executor; + } +} \ No newline at end of file diff --git a/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/service/impl/RelationServiceImpl.java b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/service/impl/RelationServiceImpl.java index 515d07a..5172290 100644 --- a/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/service/impl/RelationServiceImpl.java +++ b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/service/impl/RelationServiceImpl.java @@ -3,6 +3,7 @@ package com.hanserwei.hannote.user.relation.biz.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.RandomUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.hanserwei.framework.biz.context.holder.LoginUserContextHolder; import com.hanserwei.framework.common.exception.ApiException; import com.hanserwei.framework.common.response.PageResponse; @@ -34,6 +35,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.stereotype.Service; @@ -55,6 +57,8 @@ public class RelationServiceImpl implements RelationService { private FollowingDOService followingDOService; @Resource private RocketMQTemplate rocketMQTemplate; + @Resource(name = "relationTaskExecutor") + private ThreadPoolTaskExecutor taskExecutor; @Override public Response follow(FollowUserReqVO followUserReqVO) { @@ -280,14 +284,13 @@ public class RelationServiceImpl implements RelationService { Long total = redisTemplate.opsForZSet().zCard(followingRedisKey); log.info("==> 查询目标用户的关注列表ZSet的总大小{}", total); - // 构建回参 List findFollowingUserRspVOS = null; - if (total != null) { - //缓存有数据 - //每页展示10条数据 - long limit = 10L; + //每页展示10条数据 + long limit = 10L; + if (total != null && total > 0) { + // 缓存有数据 // 计算一共多少页 long totalPage = PageResponse.getTotalPage(total, limit); @@ -299,7 +302,7 @@ public class RelationServiceImpl implements RelationService { // 准备从ZSet中查询分页数据 // 每页展示10条数据,计算偏移量 - long offset = (pageNo - 1) * limit; + long offset = PageResponse.getOffset(pageNo, limit); // 使用 ZREVRANGEBYSCORE 命令按 score 降序获取元素,同时使用 LIMIT 子句实现分页 // 注意:这里使用了 Double.POSITIVE_INFINITY 和 Double.NEGATIVE_INFINITY 作为分数范围 @@ -318,31 +321,112 @@ public class RelationServiceImpl implements RelationService { log.info("==> 批量查询用户信息,用户ID: {}", userIds); // RPC: 批量查询用户信息 - List findUserByIdRspDTOS = userRpcService.findByIds(userIds); + //noinspection ConstantValue + findFollowingUserRspVOS = rpcUserServiceAndDTO2VO(userIds, findFollowingUserRspVOS); + } + } else { + // 若 Redis 中没有数据,则从数据库查询 + // 先查询记录总量 + long count = followingDOService.count(new LambdaQueryWrapper<>(FollowingDO.class) + .eq(FollowingDO::getUserId, userId)); + // 计算一共多少页 - log.info("==> 批量查询用户信息,结果: {}", findUserByIdRspDTOS); + long totalPage = PageResponse.getTotalPage(count, limit); - // 若不为空则,则DTO转换为VO - if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) { - findFollowingUserRspVOS = findUserByIdRspDTOS.stream().map(findUserByIdRspDTO -> FindFollowingUserRspVO.builder() - .userId(findUserByIdRspDTO.getId()) - .introduction(findUserByIdRspDTO.getIntroduction()) - .nickname(findUserByIdRspDTO.getNickName()) - .avatar(findUserByIdRspDTO.getAvatar()) - .build()).toList(); - } - }else { - // TODO: 若 Redis 中没有数据,则从数据库查询 + // 请求页码超过总页数 + if (pageNo > totalPage) { + log.info("==> 批量查询用户信息,返回空数据"); + return PageResponse.success(null, pageNo, total); + } + + // 偏移量 + long offset = PageResponse.getOffset(pageNo, limit); + + // 分页查询 + // 从数据库分页查询 + Page page = followingDOService.page(new Page<>(offset / limit + 1, limit), + new LambdaQueryWrapper() + .eq(FollowingDO::getUserId, userId) + .orderByDesc(FollowingDO::getCreateTime)); + List followingDOS = page.getRecords(); + // 赋值真实地记录总数 + total = count; + // 若记录不为空 + if (CollUtil.isNotEmpty(followingDOS)) { + // 提取所有关注用户 ID 到集合中 + List userIds = followingDOS.stream().map(FollowingDO::getFollowingUserId).toList(); + + // RPC: 调用用户服务,并将 DTO 转换为 VO + //noinspection ConstantValue + findFollowingUserRspVOS = rpcUserServiceAndDTO2VO(userIds, findFollowingUserRspVOS); + + // 异步将关注列表全量同步到 Redis + taskExecutor.submit(() -> syncFollowingList2Redis(userId)); - // TODO: 异步将关注列表全量同步到 Redis } } - //noinspection DataFlowIssue + return PageResponse.success(findFollowingUserRspVOS, pageNo, total); } + /** + * 全量同步关注列表到 Redis + * + * @param userId 用户ID + */ + private void syncFollowingList2Redis(Long userId) { + Page page = followingDOService.page(new Page<>(1, 1000), + new LambdaQueryWrapper<>(FollowingDO.class) + .select(FollowingDO::getFollowingUserId, FollowingDO::getCreateTime) + .eq(FollowingDO::getUserId, userId)); + List followingDOS = page.getRecords(); + log.info("==> 全量同步用户关注列表{}", JsonUtils.toJsonString(followingDOS)); + if (CollUtil.isNotEmpty(followingDOS)) { + // 用户关注列表 Redis Key + String followingListRedisKey = RedisKeyConstants.buildUserFollowingKey(userId); + // 随机过期时间 + // 保底1天+随机秒数 + long expireSeconds = 60 * 60 * 24 + RandomUtil.randomInt(60 * 60 * 24); + // 构建 Lua 参数 + Object[] luaArgs = buildLuaArgs(followingDOS, expireSeconds); + + // 执行 Lua 脚本,批量同步关注关系数据到 Redis 中 + DefaultRedisScript script = new DefaultRedisScript<>(); + script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/follow_batch_add_and_expire.lua"))); + script.setResultType(Long.class); + redisTemplate.execute(script, Collections.singletonList(followingListRedisKey), luaArgs); + log.info("==> 全量同步用户关注列表到 Redis,用户ID: {}", userId); + } + + } + + /** + * RPC: 调用用户服务,并将 DTO 转换为 VO + * + * @param userIds 用户 ID 列表 + * @param findFollowingUserRspVOS 跟随用户列表 + * @return 跟随用户列表 + */ + private List rpcUserServiceAndDTO2VO(List userIds, List findFollowingUserRspVOS) { + // RPC: 批量查询用户信息 + List findUserByIdRspDTOS = userRpcService.findByIds(userIds); + + // 若不为空,DTO 转 VO + if (CollUtil.isNotEmpty(findUserByIdRspDTOS)) { + findFollowingUserRspVOS = findUserByIdRspDTOS.stream() + .map(dto -> FindFollowingUserRspVO.builder() + .userId(dto.getId()) + .avatar(dto.getAvatar()) + .nickname(dto.getNickName()) + .introduction(dto.getIntroduction()) + .build()) + .toList(); + } + return findFollowingUserRspVOS; + } + /** * 校验 Lua 脚本结果,根据状态码抛出对应的业务异常 * @param result Lua 脚本返回结果 diff --git a/hanserwei-framework/hanserwei-common/src/main/java/com/hanserwei/framework/common/response/PageResponse.java b/hanserwei-framework/hanserwei-common/src/main/java/com/hanserwei/framework/common/response/PageResponse.java index b22a4a6..723c2bd 100644 --- a/hanserwei-framework/hanserwei-common/src/main/java/com/hanserwei/framework/common/response/PageResponse.java +++ b/hanserwei-framework/hanserwei-common/src/main/java/com/hanserwei/framework/common/response/PageResponse.java @@ -53,4 +53,18 @@ public class PageResponse extends Response> { public static long getTotalPage(long totalCount, long pageSize) { return pageSize == 0 ? 0 : (totalCount + pageSize - 1) / pageSize; } + + /** + * 计算分页查询的 offset + * @param pageNo 页码 + * @param pageSize 每页展示的数据量 + * @return offset + */ + public static long getOffset(long pageNo, long pageSize) { + // 如果页码小于 1,默认返回第一页的 offset + if (pageNo < 1) { + pageNo = 1; + } + return (pageNo - 1) * pageSize; + } }