diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java index 9e0de5e..40b09c3 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java @@ -32,4 +32,14 @@ public interface MQConstants { */ String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic"; + /** + * Topic: 计数 - 笔记收藏数 + */ + String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic"; + + /** + * Topic: 计数 - 笔记收藏数落库 + */ + String TOPIC_COUNT_NOTE_COLLECT_2_DB = "CountNoteCollect2DBTTopic"; + } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java index fd5fdea..e6dffcb 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java @@ -26,6 +26,11 @@ public class RedisKeyConstants { */ private static final String COUNT_NOTE_KEY_PREFIX = "count:note:"; + /** + * Hash Field: 笔记收藏总数 + */ + public static final String FIELD_COLLECT_TOTAL = "collectTotal"; + /** * 构建用户维度计数 Key * diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFans2DBConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFans2DBConsumer.java index 8ea7051..646d56c 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFans2DBConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountFans2DBConsumer.java @@ -15,7 +15,7 @@ import java.util.Map; @SuppressWarnings("ALL") @Component -@RocketMQMessageListener(consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_FANS_2_DB, // Group 组 +@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FANS_2_DB, // Group 组 topic = MQConstants.TOPIC_COUNT_FANS_2_DB // 主题 Topic ) @Slf4j diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollect2DBConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollect2DBConsumer.java new file mode 100644 index 0000000..e35c0d1 --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollect2DBConsumer.java @@ -0,0 +1,49 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import cn.hutool.core.collection.CollUtil; +import com.google.common.util.concurrent.RateLimiter; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@SuppressWarnings("UnstableApiUsage") +@Component +@Slf4j +@RocketMQMessageListener( + consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT_2_DB, + topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT_2_DB +) +public class CountNoteCollect2DBConsumer implements RocketMQListener { + + // 每秒创建 5000 个令牌 + private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Resource + private NoteCountDOMapper noteCountDOMapper; + + @Override + public void onMessage(String body) { + // 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得 + rateLimiter.acquire(); + + log.info("## 消费到了 MQ 【计数: 笔记收藏数入库】, {}...", body); + + Map countMap = null; + try { + countMap = JsonUtils.parseMap(body, Long.class, Integer.class); + } catch (Exception e) { + log.error("## 解析 JSON 字符串异常", e); + } + + if (CollUtil.isNotEmpty(countMap)) { + // 判断数据库中 t_note_count 表,若笔记计数记录不存在,则插入;若记录已存在,则直接更新 + countMap.forEach((k, v) -> noteCountDOMapper.insertOrUpdateCollectTotalByNoteId(v, k)); + } + } +} diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java new file mode 100644 index 0000000..1539d79 --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java @@ -0,0 +1,118 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import com.github.phantomthief.collection.BufferTrigger; +import com.google.common.collect.Maps; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants; +import com.hanserwei.hannote.count.biz.enums.CollectUnCollectNoteTypeEnum; +import com.hanserwei.hannote.count.biz.model.dto.CountCollectUnCollectNoteMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +@Component +@Slf4j +@RocketMQMessageListener( + consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT, + topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT +) +public class CountNoteCollectConsumer implements RocketMQListener { + + @Resource + private RedisTemplate redisTemplate; + @Resource + private RocketMQTemplate rocketMQTemplate; + private final BufferTrigger bufferTrigger = BufferTrigger.batchBlocking() + .bufferSize(50000) // 缓存队列的最大容量 + .batchSize(1000) // 一批次最多聚合 1000 条 + .linger(Duration.ofSeconds(1)) // 多久聚合一次 + .setConsumerEx(this::consumeMessage) // 设置消费者方法 + .build(); + + @Override + public void onMessage(String body) { + // 往 bufferTrigger 中添加元素 + bufferTrigger.enqueue(body); + } + + private void consumeMessage(List bodies) { + log.info("==> 【笔记收藏数】聚合消息, size: {}", bodies.size()); + log.info("==> 【笔记收藏数】聚合消息, {}", JsonUtils.toJsonString(bodies)); + + // List -> List + List countCollectUnCollectNoteMqDTOS = bodies.stream() + .map(body -> JsonUtils.parseObject(body, CountCollectUnCollectNoteMqDTO.class)) + .toList(); + + // 按笔记ID分组 + Map> groupMap = countCollectUnCollectNoteMqDTOS.stream() + .collect(Collectors.groupingBy(CountCollectUnCollectNoteMqDTO::getNoteId)); + // 按组汇总数据,统计出最终的计数 + // key 为笔记 ID, value 为最终操作的计数 + Map countMap = Maps.newHashMap(); + for (Map.Entry> entry : groupMap.entrySet()) { + List list = entry.getValue(); + // 默认计数为0 + int finalCount = 0; + for (CountCollectUnCollectNoteMqDTO countCollectUnCollectNoteMqDTO : list) { + Integer type = countCollectUnCollectNoteMqDTO.getType(); + // 获取枚举类 + CollectUnCollectNoteTypeEnum collectUnCollectNoteTypeEnum = CollectUnCollectNoteTypeEnum.valueOf(type); + switch (Objects.requireNonNull(collectUnCollectNoteTypeEnum)) { + case COLLECT -> finalCount++; + case UN_COLLECT -> finalCount--; + } + } + // 将分组后统计出的最终计数,存入 countMap 中 + countMap.put(entry.getKey(), finalCount); + } + log.info("==> 【笔记收藏数】最终结果, {}", JsonUtils.toJsonString(countMap)); + + // 更新 Redis + countMap.forEach((k, v) -> { + // Redis Hash Key + String redisKey = RedisKeyConstants.buildCountNoteKey(k); + // 判断 Redis 中 Hash 是否存在 + boolean isExisted = redisTemplate.hasKey(redisKey); + + // 若存在才会更新 + // (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做) + if (isExisted) { + // 对目标用户 Hash 中的收藏总数字段进行计数操作 + redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_COLLECT_TOTAL, v); + } + }); + + // 发送 MQ, 笔记收藏数据落库 + Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap)) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT_2_DB, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数服务:笔记收藏数入库】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数服务:笔记收藏数入库】MQ 发送异常: ", throwable); + } + }); + } +} diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java index da457cd..4cbf415 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java @@ -17,7 +17,7 @@ import java.util.Map; @Slf4j @SuppressWarnings({"UnstableApiUsage"}) @RocketMQMessageListener( - consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB, + consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB, topic = MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB ) public class CountNoteLike2DBConsumer implements RocketMQListener { diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java index 33de50f..4bce40f 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java @@ -27,7 +27,7 @@ import java.util.stream.Collectors; @Component @Slf4j @RocketMQMessageListener( - consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE, + consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE, topic = MQConstants.TOPIC_COUNT_NOTE_LIKE ) public class CountNoteLikeConsumer implements RocketMQListener { diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java index 4e95c8a..a5bee9e 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java @@ -16,4 +16,13 @@ public interface NoteCountDOMapper extends BaseMapper { * @return 影响行数 */ int insertOrUpdateLikeTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId); + + /** + * 添加记录或更新笔记收藏数 + * + * @param count 收藏数 + * @param noteId 笔记ID + * @return 影响行数 + */ + int insertOrUpdateCollectTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId); } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/CollectUnCollectNoteTypeEnum.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/CollectUnCollectNoteTypeEnum.java new file mode 100644 index 0000000..293ced1 --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/CollectUnCollectNoteTypeEnum.java @@ -0,0 +1,28 @@ +package com.hanserwei.hannote.count.biz.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + +@Getter +@AllArgsConstructor +public enum CollectUnCollectNoteTypeEnum { + // 收藏 + COLLECT(1), + // 取消收藏 + UN_COLLECT(0), + ; + + private final Integer code; + + public static CollectUnCollectNoteTypeEnum valueOf(Integer code) { + for (CollectUnCollectNoteTypeEnum collectUnCollectNoteTypeEnum : CollectUnCollectNoteTypeEnum.values()) { + if (Objects.equals(code, collectUnCollectNoteTypeEnum.getCode())) { + return collectUnCollectNoteTypeEnum; + } + } + return null; + } + +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountCollectUnCollectNoteMqDTO.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountCollectUnCollectNoteMqDTO.java new file mode 100644 index 0000000..9ca5f2b --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountCollectUnCollectNoteMqDTO.java @@ -0,0 +1,26 @@ +package com.hanserwei.hannote.count.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CountCollectUnCollectNoteMqDTO { + + private Long userId; + + private Long noteId; + + /** + * 0: 取消收藏, 1:收藏 + */ + private Integer type; + + private LocalDateTime createTime; +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml index 1d33e74..ccae724 100644 --- a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml +++ b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml @@ -20,4 +20,10 @@ VALUES (#{noteId}, #{count}) ON DUPLICATE KEY UPDATE like_total = like_total + (#{count}); + + + INSERT INTO t_note_count (note_id, collect_total) + VALUES (#{noteId}, #{count}) + ON DUPLICATE KEY UPDATE collect_total = collect_total + (#{count}); + \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java index dab7996..a9d4b9a 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java @@ -8,10 +8,14 @@ import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper; import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @@ -21,7 +25,7 @@ import java.util.Objects; @Component @Slf4j @RocketMQMessageListener( - consumerGroup = "han_note_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, + consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, consumeMode = ConsumeMode.ORDERLY ) @@ -31,6 +35,8 @@ public class CollectUnCollectNoteConsumer implements RocketMQListener { private final RateLimiter rateLimiter = RateLimiter.create(5000); @Resource private NoteCollectionDOMapper noteCollectionDOMapper; + @Resource + private RocketMQTemplate rocketMQTemplate; @Override public void onMessage(Message message) { @@ -85,7 +91,27 @@ public class CollectUnCollectNoteConsumer implements RocketMQListener { // 取消收藏:记录更新 int count = noteCollectionDOMapper.update2UnCollectByUserIdAndNoteId(noteCollectionDO); - // TODO: 发送计数 MQ + if (count == 0) { + return; + } + + // 更新数据库成功后,发送计数 MQ + org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数: 笔记取消收藏】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数: 笔记取消收藏】MQ 发送异常: ", throwable); + } + }); + } /** @@ -119,6 +145,28 @@ public class CollectUnCollectNoteConsumer implements RocketMQListener { // 添加或更新笔记收藏记录 boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO); - // TODO: 发送计数 MQ + if (!isSuccess) { + return; + } + + // 发送计数 MQ + + // 更新数据库成功后,发送计数 MQ + org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) + .build(); + + // 异步发送 MQ 消息 + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数: 笔记收藏】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数: 笔记收藏】MQ 发送异常: ", throwable); + } + }); } } diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java index b143e6d..3a9ee6b 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java @@ -27,7 +27,7 @@ import java.util.Objects; @SuppressWarnings({"UnstableApiUsage"}) @Component @RocketMQMessageListener( - consumerGroup = "han_note_" + MQConstants.TOPIC_LIKE_OR_UNLIKE, + consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE, topic = MQConstants.TOPIC_LIKE_OR_UNLIKE, consumeMode = ConsumeMode.ORDERLY// 顺序消费 )