diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java index 63d7e7f..e8e858f 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/MQConstants.java @@ -7,4 +7,9 @@ public interface MQConstants { */ String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; + /** + * Topic: 计数 - 笔记收藏数 + */ + String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic"; + } \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java index 6369a3f..ac22e38 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/constant/RedisKeyConstants.java @@ -7,6 +7,11 @@ public class RedisKeyConstants { */ public static final String BLOOM_TODAY_NOTE_LIKE_LIST_KEY = "bloom:dataAlign:note:likes:"; + /** + * 布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 前缀 + */ + public static final String BLOOM_TODAY_NOTE_COLLECT_LIST_KEY = "bloom:dataAlign:note:collects:"; + /** * 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY @@ -18,4 +23,14 @@ public class RedisKeyConstants { return BLOOM_TODAY_NOTE_LIKE_LIST_KEY + date; } + /** + * 构建完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY + * + * @param date 日期 + * @return 完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY + */ + public static String buildBloomUserNoteCollectListKey(String date) { + return BLOOM_TODAY_NOTE_COLLECT_LIST_KEY + date; + } + } \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java new file mode 100644 index 0000000..b133633 --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/consumer/TodayNoteCollectIncrementData2DBConsumer.java @@ -0,0 +1,111 @@ +package com.hanserwei.hannote.data.align.consumer; + +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.data.align.constant.MQConstants; +import com.hanserwei.hannote.data.align.constant.RedisKeyConstants; +import com.hanserwei.hannote.data.align.constant.TableConstants; +import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper; +import com.hanserwei.hannote.data.align.model.vo.CollectUnCollectNoteMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.script.RedisScript; +import org.springframework.scripting.support.ResourceScriptSource; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Objects; + +@Component +@Slf4j +@RocketMQMessageListener( + consumerGroup = "han_note_group_data_align_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT, + topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT +) +public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListener { + + @Resource + private RedisTemplate redisTemplate; + @Resource + private TransactionTemplate transactionTemplate; + @Resource + private InsertRecordMapper insertRecordMapper; + + /** + * 表总分片数 + */ + @Value("${table.shards}") + private int tableShards; + + @Override + public void onMessage(String body) { + log.info("## TodayNoteCollectIncrementData2DBConsumer 消费到了 MQ: {}", body); + // 1. 布隆过滤器判断该日增量数据是否已经记录 + // 消息体转DTO + CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(body, CollectUnCollectNoteMqDTO.class); + if (Objects.isNull(collectUnCollectNoteMqDTO)) { + return; + } + log.info("## TodayNoteCollectIncrementData2DBConsumer 笔记收藏数据:{}", JsonUtils.toJsonString(collectUnCollectNoteMqDTO)); + + // 被收藏的笔记ID + Long noteId = collectUnCollectNoteMqDTO.getNoteId(); + // 笔记作者ID + Long noteCreatorId = collectUnCollectNoteMqDTO.getNoteCreatorId(); + // 今日日期 + String date = LocalDate.now() + .format(DateTimeFormatter.ofPattern("yyyyMMdd")); + + String bloomKey = RedisKeyConstants.buildBloomUserNoteCollectListKey(date); + + // 1. 布隆过滤器判断该日增量数据是否已经记录 + DefaultRedisScript script = new DefaultRedisScript<>(); + // Lua 脚本路径 + script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_today_note_collect_check.lua"))); + // 返回值类型 + script.setResultType(Long.class); + + // 执行 Lua 脚本,拿到返回结果 + Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId); + log.info("布隆过滤器判断结果:{}", result); + + // 若布隆过滤器判断不存在(绝对正确) + if (Objects.equals(result, 0L)) { + // 2. 若无,才会落库,减轻数据库压力 + // 根据分片总数,取模,分别获取对应的分片序号 + long userIdHashKey = noteCreatorId % tableShards; + long noteIdHashKey = noteId % tableShards; + log.info("根据分片总数,取模,分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey); + + // 编程式事务,保证多语句的原子性 + transactionTemplate.execute(status -> { + try { + // 将日增量变更数据,分别写入两张表 + // - t_data_align_note_collect_count_temp_日期_分片序号 + // - t_data_align_user_collect_count_temp_日期_分片序号 + insertRecordMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId); + insertRecordMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId); + + return true; + } catch (Exception ex) { + status.setRollbackOnly(); // 标记事务为回滚 + log.error("", ex); + } + return false; + }); + + // 3. 数据库写入成功后,再添加布隆过滤器中 + // 4. 数据库写入成功后,再添加布隆过滤器中 + RedisScript bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class); + redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId); + } + } +} diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java index b8152cd..6afbaf9 100644 --- a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/domain/mapper/InsertRecordMapper.java @@ -17,4 +17,13 @@ public interface InsertRecordMapper { */ void insert2DataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId); + /** + * 笔记收藏数:计数变更 + */ + void insert2DataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("noteId") Long noteId); + + /** + * 用户获得的收藏数:计数变更 + */ + void insert2DataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId); } \ No newline at end of file diff --git a/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/model/vo/CollectUnCollectNoteMqDTO.java b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/model/vo/CollectUnCollectNoteMqDTO.java new file mode 100644 index 0000000..2a7c9d7 --- /dev/null +++ b/han-note-data-align/src/main/java/com/hanserwei/hannote/data/align/model/vo/CollectUnCollectNoteMqDTO.java @@ -0,0 +1,31 @@ +package com.hanserwei.hannote.data.align.model.vo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CollectUnCollectNoteMqDTO { + + private Long userId; + + private Long noteId; + + /** + * 0: 取消收藏, 1:收藏 + */ + private Integer type; + + private LocalDateTime createTime; + + /** + * 笔记发布者 ID + */ + private Long noteCreatorId; +} \ No newline at end of file diff --git a/han-note-data-align/src/main/resources/lua/bloom_today_note_collect_check.lua b/han-note-data-align/src/main/resources/lua/bloom_today_note_collect_check.lua new file mode 100644 index 0000000..0d40b9a --- /dev/null +++ b/han-note-data-align/src/main/resources/lua/bloom_today_note_collect_check.lua @@ -0,0 +1,16 @@ +-- LUA 脚本:日增量笔记收藏、取消收藏变更数据布隆过滤器 + +local key = KEYS[1] -- 操作的 Redis Key +local noteIdAndNoteCreatorId = ARGV[1] -- Redis Value + +-- 使用 EXISTS 命令检查布隆过滤器是否存在 +local exists = redis.call('EXISTS', key) +if exists == 0 then + -- 创建布隆过滤器 + redis.call('BF.ADD', key, '') + -- 设置过期时间,一天后过期 + redis.call("EXPIRE", key, 20 * 60 * 60) +end + +-- 校验该变更数据是否已经存在(1 表示已存在,0 表示不存在) +return redis.call('BF.EXISTS', key, noteIdAndNoteCreatorId) diff --git a/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml b/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml index 29a2fb9..3e5c5b6 100644 --- a/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml +++ b/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml @@ -10,4 +10,14 @@ insert into `t_data_align_user_like_count_temp_${tableNameSuffix}` (user_id) values (#{userId}) + + + insert into `t_data_align_note_collect_count_temp_${tableNameSuffix}` (note_id) + values (#{noteId}) + + + + insert into `t_data_align_user_collect_count_temp_${tableNameSuffix}` (user_id) + values (#{userId}) + \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java index d1dc887..f2a4d9c 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java @@ -465,7 +465,7 @@ public class NoteServiceImpl extends ServiceImpl implement String contentUuid = noteDO1.getContentUuid(); // 笔记内容是否更新成功 - boolean isUpdateContentSuccess = false; + boolean isUpdateContentSuccess; if (StringUtils.isBlank(content)) { // 若笔记内容为空,则删除 K-V 存储 isUpdateContentSuccess = keyValueRpcService.deleteNoteContent(contentUuid); @@ -904,9 +904,7 @@ public class NoteServiceImpl extends ServiceImpl implement // 若目标笔记已经收藏 if (count > 0) { // 异步初始化布隆过滤器 - threadPoolTaskExecutor.submit(() -> { - batchAddNoteCollect2BloomAndExpire(userId, expireSeconds, bloomUserNoteCollectListKey); - }); + threadPoolTaskExecutor.submit(() -> batchAddNoteCollect2BloomAndExpire(userId, expireSeconds, bloomUserNoteCollectListKey)); throw new ApiException(ResponseCodeEnum.NOTE_ALREADY_COLLECTED); } // 若目标笔记未被收藏,查询当前用户是否有收藏其他笔记,有则同步初始化布隆过滤器 @@ -1037,7 +1035,7 @@ public class NoteServiceImpl extends ServiceImpl implement DefaultRedisScript script = new DefaultRedisScript<>(); // Lua 脚本路径 - script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_note_uncollect_check.lua"))); + script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_note_uncollected_check.lua"))); // 返回值类型 script.setResultType(Long.class); @@ -1073,7 +1071,10 @@ public class NoteServiceImpl extends ServiceImpl implement // 用户收藏列表 ZSet Key String userNoteCollectZSetKey = RedisKeyConstants.buildUserNoteCollectZSetKey(userId); - redisTemplate.opsForZSet().remove(userNoteCollectZSetKey, noteId); + Long removed = redisTemplate.opsForZSet().remove(userNoteCollectZSetKey, noteId); + if (removed != null && removed == 0) { + throw new ApiException(ResponseCodeEnum.NOTE_NOT_COLLECTED); + } // 4. 发送 MQ, 数据更新落库 // 构建消息体 DTO diff --git a/han-note-note/han-note-note-biz/src/main/resources/lua/bloom_note_uncollect_check.lua b/han-note-note/han-note-note-biz/src/main/resources/lua/bloom_note_uncollected_check.lua similarity index 100% rename from han-note-note/han-note-note-biz/src/main/resources/lua/bloom_note_uncollect_check.lua rename to han-note-note/han-note-note-biz/src/main/resources/lua/bloom_note_uncollected_check.lua