diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java index 753250c..9e0de5e 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java @@ -22,4 +22,14 @@ public interface MQConstants { */ String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic"; + /** + * Topic: 计数 - 笔记点赞数 + */ + String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; + + /** + * Topic: 计数 - 笔记点赞数落库 + */ + String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic"; + } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java index d09f98d..fd5fdea 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/RedisKeyConstants.java @@ -6,15 +6,26 @@ public class RedisKeyConstants { * Hash Field: 粉丝总数 */ public static final String FIELD_FANS_TOTAL = "fansTotal"; + /** * Hash Field: 关注总数 */ public static final String FIELD_FOLLOWING_TOTAL = "followingTotal"; + /** * 用户维度计数 Key 前缀 */ private static final String COUNT_USER_KEY_PREFIX = "count:user:"; + /** + * Hash Field: 笔记点赞总数 + */ + public static final String FIELD_LIKE_TOTAL = "likeTotal"; + /** + * 笔记维度计数 Key 前缀 + */ + private static final String COUNT_NOTE_KEY_PREFIX = "count:note:"; + /** * 构建用户维度计数 Key * @@ -25,5 +36,15 @@ public class RedisKeyConstants { return COUNT_USER_KEY_PREFIX + userId; } + /** + * 构建笔记维度计数 Key + * + * @param noteId 笔记ID + * @return 笔记维度计数 Key + */ + public static String buildCountNoteKey(Long noteId) { + return COUNT_NOTE_KEY_PREFIX + noteId; + } + } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java new file mode 100644 index 0000000..da457cd --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLike2DBConsumer.java @@ -0,0 +1,49 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import cn.hutool.core.collection.CollUtil; +import com.google.common.util.concurrent.RateLimiter; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +@Slf4j +@SuppressWarnings({"UnstableApiUsage"}) +@RocketMQMessageListener( + consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB, + topic = MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB +) +public class CountNoteLike2DBConsumer implements RocketMQListener { + + // 每秒创建 5000 个令牌 + private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Resource + private NoteCountDOMapper noteCountDOMapper; + + @Override + public void onMessage(String body) { + // 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得 + rateLimiter.acquire(); + + log.info("## 消费到了 MQ 【计数: 笔记点赞数入库】, {}...", body); + + Map countMap = null; + try { + countMap = JsonUtils.parseMap(body, Long.class, Integer.class); + } catch (Exception e) { + log.error("## 解析 JSON 字符串异常", e); + } + + if (CollUtil.isNotEmpty(countMap)) { + // 判断数据库中 t_note_count 表,若笔记计数记录不存在,则插入;若记录已存在,则直接更新 + countMap.forEach((k, v) -> noteCountDOMapper.insertOrUpdateLikeTotalByNoteId(v, k)); + } + } +} diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java new file mode 100644 index 0000000..33de50f --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java @@ -0,0 +1,115 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import com.github.phantomthief.collection.BufferTrigger; +import com.google.common.collect.Maps; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants; +import com.hanserwei.hannote.count.biz.enums.LikeUnlikeNoteTypeEnum; +import com.hanserwei.hannote.count.biz.model.dto.CountLikeUnlikeNoteMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Component +@Slf4j +@RocketMQMessageListener( + consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_NOTE_LIKE, + topic = MQConstants.TOPIC_COUNT_NOTE_LIKE +) +public class CountNoteLikeConsumer implements RocketMQListener { + + @Resource + private RedisTemplate redisTemplate; + @Resource + private RocketMQTemplate rocketMQTemplate; + + private final BufferTrigger bufferTrigger = BufferTrigger.batchBlocking() + .bufferSize(50000) // 缓存队列的最大容量 + .batchSize(1000) // 一批次最多聚合 1000 条 + .linger(Duration.ofSeconds(1)) // 多久聚合一次 + .setConsumerEx(this::consumeMessage) // 设置消费者方法 + .build(); + + @Override + public void onMessage(String body) { + // 往 bufferTrigger 中添加元素 + bufferTrigger.enqueue(body); + } + + private void consumeMessage(List bodies) { + log.info("==> 【笔记点赞数】聚合消息, size: {}", bodies.size()); + log.info("==> 【笔记点赞数】聚合消息, {}", JsonUtils.toJsonString(bodies)); + List countLikeUnlikeNoteMqDTOS = bodies.stream() + .map(body -> JsonUtils.parseObject(body, CountLikeUnlikeNoteMqDTO.class)).toList(); + // 按笔记ID分组 + Map> groupMap = countLikeUnlikeNoteMqDTOS.stream() + .collect(Collectors.groupingBy(CountLikeUnlikeNoteMqDTO::getNoteId)); + + // 按组汇总统计处最终计数 + // key为笔记ID,value为最终操作计数 + Map countMap = Maps.newHashMap(); + for (Map.Entry> entry : groupMap.entrySet()) { + List list = entry.getValue(); + // 最终计数默认为0 + int finalCount = 0; + for (CountLikeUnlikeNoteMqDTO countLikeUnlikeNoteMqDTO : list) { + Integer type = countLikeUnlikeNoteMqDTO.getType(); + LikeUnlikeNoteTypeEnum likeUnlikeNoteTypeEnum = LikeUnlikeNoteTypeEnum.valueOf(type); + if (likeUnlikeNoteTypeEnum == null) { + continue; + } + switch (likeUnlikeNoteTypeEnum) { + case LIKE -> finalCount++; + case UNLIKE -> finalCount--; + } + } + countMap.put(entry.getKey(), finalCount); + } + log.info("## 【笔记点赞数】聚合后的计数数据: {}", JsonUtils.toJsonString(countMap)); + // 更新Redis + countMap.forEach((k, v) -> { + // Redis Key + String redisKey = RedisKeyConstants.buildCountNoteKey(k); + // 判断 Redis 中 Hash 是否存在 + boolean isExisted = redisTemplate.hasKey(redisKey); + + // 若存在才会更新 + // (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做) + if (isExisted) { + // 对目标用户 Hash 中的点赞数字段进行计数操作 + redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_LIKE_TOTAL, v); + } + }); + + // 发送 MQ, 笔记点赞数据落库 + Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap)) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE_2_DB, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数服务:笔记点赞数入库】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数服务:笔记点赞数入库】MQ 发送异常: ", throwable); + } + }); + } +} diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java index 8b2d467..4e95c8a 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java @@ -3,7 +3,17 @@ package com.hanserwei.hannote.count.biz.domain.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.hanserwei.hannote.count.biz.domain.dataobject.NoteCountDO; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; @Mapper public interface NoteCountDOMapper extends BaseMapper { + + /** + * 添加笔记计数记录或更新笔记点赞数 + * + * @param count 计数 + * @param noteId 笔记ID + * @return 影响行数 + */ + int insertOrUpdateLikeTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId); } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/LikeUnlikeNoteTypeEnum.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/LikeUnlikeNoteTypeEnum.java new file mode 100644 index 0000000..44f2020 --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/enums/LikeUnlikeNoteTypeEnum.java @@ -0,0 +1,28 @@ +package com.hanserwei.hannote.count.biz.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + +@Getter +@AllArgsConstructor +public enum LikeUnlikeNoteTypeEnum { + // 点赞 + LIKE(1), + // 取消点赞 + UNLIKE(0), + ; + + private final Integer code; + + public static LikeUnlikeNoteTypeEnum valueOf(Integer code) { + for (LikeUnlikeNoteTypeEnum likeUnlikeNoteTypeEnum : LikeUnlikeNoteTypeEnum.values()) { + if (Objects.equals(code, likeUnlikeNoteTypeEnum.getCode())) { + return likeUnlikeNoteTypeEnum; + } + } + return null; + } + +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountLikeUnlikeNoteMqDTO.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountLikeUnlikeNoteMqDTO.java new file mode 100644 index 0000000..a01bf7c --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountLikeUnlikeNoteMqDTO.java @@ -0,0 +1,26 @@ +package com.hanserwei.hannote.count.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CountLikeUnlikeNoteMqDTO { + + private Long userId; + + private Long noteId; + + /** + * 0: 取消点赞, 1:点赞 + */ + private Integer type; + + private LocalDateTime createTime; +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml index b85c620..1d33e74 100644 --- a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml +++ b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml @@ -14,4 +14,10 @@ id, note_id, like_total, collect_total, comment_total + + + INSERT INTO t_note_count (note_id, like_total) + VALUES (#{noteId}, #{count}) + ON DUPLICATE KEY UPDATE like_total = like_total + (#{count}); + \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java index 07d48de..b143e6d 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java @@ -11,10 +11,14 @@ import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO; import com.hanserwei.hannote.note.biz.service.NoteLikeDOService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @@ -36,6 +40,8 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener { private NoteLikeDOMapper noteLikeDOMapper; @Resource private NoteLikeDOService noteLikeDOService; + @Resource + private RocketMQTemplate rocketMQTemplate; @Override public void onMessage(Message message) { @@ -94,7 +100,25 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener { // 执行更新 boolean update = noteLikeDOService.update(updateEntity, wrapper); - // TODO: 删除计数 + if (!update) { + return; + } + // 更新数据库成功后,发送计数 MQ + org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数: 笔记取消点赞】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数: 笔记取消点赞】MQ 发送异常: ", throwable); + } + }); } /** @@ -128,7 +152,27 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener { // 添加或更新笔记点赞记录 boolean count = noteLikeDOMapper.insertOrUpdate(noteLikeDO); - // TODO: 发送计数 MQ + if (!count) { + return; + } + + // 发送计数 MQ + // 更新数据库成功后,发送计数 MQ + org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数: 笔记点赞】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数: 笔记点赞】MQ 发送异常: ", throwable); + } + }); } } diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java index a3f209e..afca2b3 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java @@ -17,6 +17,11 @@ public interface MQConstants { */ String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic"; + /** + * Topic: 计数 - 笔记点赞数 + */ + String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; + /** * 点赞标签 */