diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFollowing2DBConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFollowing2DBConsumer.java index 3f9f4a5..fc35ae5 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFollowing2DBConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFollowing2DBConsumer.java @@ -17,7 +17,7 @@ import java.util.Objects; @Component @SuppressWarnings("ALL") -@RocketMQMessageListener(consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_FOLLOWING_2_DB, // Group 组 +@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FOLLOWING_2_DB, // Group 组 topic = MQConstants.TOPIC_COUNT_FOLLOWING_2_DB // 主题 Topic ) @Slf4j diff --git a/han-note-data-align/pom.xml b/han-note-data-align/pom.xml index dbb99d5..920259c 100644 --- a/han-note-data-align/pom.xml +++ b/han-note-data-align/pom.xml @@ -83,6 +83,19 @@ com.xuxueli xxl-job-core + + + + com.hanserwei + hanserwei-spring-boot-starter-jackson + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + + diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/config/RocketMQConfig.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/config/RocketMQConfig.java new file mode 100644 index 0000000..6bbcf5e --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/config/RocketMQConfig.java @@ -0,0 +1,10 @@ +package com.hanserwei.hannote.data.align.config; + +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +@Configuration +@Import(RocketMQAutoConfiguration.class) +public class RocketMQConfig { +} \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java new file mode 100644 index 0000000..63d7e7f --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java @@ -0,0 +1,10 @@ +package com.hanserwei.hannote.data.align.constant; + +public interface MQConstants { + + /** + * Topic: 计数 - 笔记点赞数 + */ + String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; + +} \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java new file mode 100644 index 0000000..f98f739 --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteLikeIncrementData2DBConsumer.java @@ -0,0 +1,20 @@ +package com.hanserwei.hannote.data.align.consumer; + +import com.hanserwei.hannote.data.align.constant.MQConstants; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RocketMQMessageListener( + consumerGroup = "han_note_group_data_align_" + MQConstants.TOPIC_COUNT_NOTE_LIKE, + topic = MQConstants.TOPIC_COUNT_NOTE_LIKE +) +public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener { + @Override + public void onMessage(String body) { + log.info("## TodayNoteLikeIncrementData2DBConsumer 消费到了 MQ: {}", body); + } +} diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java index 3a9ee6b..c6e45d2 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java @@ -99,6 +99,7 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener { // 执行更新 boolean update = noteLikeDOService.update(updateEntity, wrapper); + log.info("==> 【取消点赞笔记】更新数据库成功,update: {}", update); if (!update) { return; diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java index fe5bf64..d1dc887 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java @@ -792,26 +792,24 @@ public class NoteServiceImpl extends ServiceImpl implement NoteUnlikeLuaResultEnum noteUnlikeLuaResultEnum = NoteUnlikeLuaResultEnum.valueOf(result); log.info("==> 【笔记取消点赞】Lua 脚本返回结果: {}", noteUnlikeLuaResultEnum); - assert noteUnlikeLuaResultEnum != null; - switch (noteUnlikeLuaResultEnum) { + switch (Objects.requireNonNull(noteUnlikeLuaResultEnum)) { // 布隆过滤器不存在 - case NOT_EXIST -> { - //笔记不存在 + case NOT_EXIST -> {//笔记不存在 //异步初始化布隆过滤器 threadPoolTaskExecutor.submit(() -> { // 保底1天+随机秒数 long expireSeconds = 60 * 60 * 24 + RandomUtil.randomInt(60 * 60 * 24); batchAddNoteLike2BloomAndExpire(userId, expireSeconds, bloomUserNoteLikeListKey); - // 从数据库中校验笔记是否被点赞 - long count = noteLikeDOService.count(new LambdaQueryWrapper<>(NoteLikeDO.class) - .eq(NoteLikeDO::getUserId, userId) - .eq(NoteLikeDO::getNoteId, noteId) - .eq(NoteLikeDO::getStatus, LikeStatusEnum.LIKE.getCode())); - if (count == 0) { - log.info("==> 【笔记取消点赞】用户未点赞该笔记"); - throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED); - } }); + // 从数据库中校验笔记是否被点赞 + long count = noteLikeDOService.count(new LambdaQueryWrapper<>(NoteLikeDO.class) + .eq(NoteLikeDO::getUserId, userId) + .eq(NoteLikeDO::getNoteId, noteId) + .eq(NoteLikeDO::getStatus, LikeStatusEnum.LIKE.getCode())); + if (count == 0) { + log.info("==> 【笔记取消点赞】用户未点赞该笔记"); + throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED); + } } // 布隆过滤器校验目标笔记未被点赞(判断绝对正确) case NOTE_NOT_LIKED -> throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED); @@ -821,7 +819,14 @@ public class NoteServiceImpl extends ServiceImpl implement // 用户点赞列表ZsetKey String userNoteLikeZSetKey = RedisKeyConstants.buildUserNoteLikeZSetKey(userId); - redisTemplate.opsForZSet().remove(userNoteLikeZSetKey, noteId); + // TODO: 后续考虑换掉布隆过滤器。 + + Long removed = redisTemplate.opsForZSet().remove(userNoteLikeZSetKey, noteId); + + if (Objects.nonNull(removed) && removed == 0) { + log.info("==> 【笔记取消点赞】用户未点赞该笔记"); + throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED); + } //4. 发送 MQ, 数据更新落库 // 构建MQ消息体 @@ -932,7 +937,6 @@ public class NoteServiceImpl extends ServiceImpl implement } } - // 3. 更新用户 ZSET 收藏列表 // 3. 更新用户 ZSET 收藏列表 LocalDateTime now = LocalDateTime.now(); // Lua 脚本路径 @@ -1271,11 +1275,14 @@ public class NoteServiceImpl extends ServiceImpl implement */ private void batchAddNoteLike2BloomAndExpire(Long userId, long expireSeconds, String bloomUserNoteLikeListKey) { try { + log.info("## 异步初始化【笔记点赞】布隆过滤器开始: userId={}", userId); // 异步全量同步一下,并设置过期时间 List noteLikeDOS = noteLikeDOService.list(new LambdaQueryWrapper<>(NoteLikeDO.class) .select(NoteLikeDO::getNoteId) .eq(NoteLikeDO::getUserId, userId) .eq(NoteLikeDO::getStatus, LikeStatusEnum.LIKE.getCode())); + log.info("## 异步初始化【笔记点赞】布隆过滤器,用户笔记点赞数量: {},笔记ID:{}", noteLikeDOS.size(), + JsonUtils.toJsonString(noteLikeDOS.stream().map(NoteLikeDO::getNoteId).toList())); if (CollUtil.isNotEmpty(noteLikeDOS)) { DefaultRedisScript script = new DefaultRedisScript<>(); @@ -1288,7 +1295,8 @@ public class NoteServiceImpl extends ServiceImpl implement List luaArgs = Lists.newArrayList(); noteLikeDOS.forEach(noteLikeDO -> luaArgs.add(noteLikeDO.getNoteId())); // 将每个点赞的笔记 ID 传入 luaArgs.add(expireSeconds); // 最后一个参数是过期时间(秒) - redisTemplate.execute(script, Collections.singletonList(bloomUserNoteLikeListKey), luaArgs.toArray()); + Long result = redisTemplate.execute(script, Collections.singletonList(bloomUserNoteLikeListKey), luaArgs.toArray()); + log.info("## 异步初始化【笔记点赞】布隆过滤器结果: {}", result); } } catch (Exception e) { log.error("## 异步初始化布隆过滤器异常: ", e); diff --git a/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java index ba34355..71af7ba 100644 --- a/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java +++ b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java @@ -36,6 +36,7 @@ import java.time.LocalDateTime; import java.util.Collections; import java.util.Objects; +@SuppressWarnings("UnstableApiUsage") @Component @RocketMQMessageListener( consumerGroup = "han_note_group_" + MQConstants.TOPIC_FOLLOW_OR_UNFOLLOW, //han_note_group_FollowUnfollowTopic