diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java index 65c2ba3..21ce583 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java @@ -2,6 +2,16 @@ package com.hanserwei.hannote.count.biz.constant; public interface MQConstants { + /** + * Topic: 计数 - 笔记点赞数 + */ + String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic"; + + /** + * Topic: 笔记收藏、取消收藏 + */ + String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic"; + /** * Topic: 关注数计数 */ diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java index 34c2cbd..c824663 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCollectConsumer.java @@ -29,8 +29,8 @@ import java.util.stream.Collectors; @Component @Slf4j @RocketMQMessageListener( - consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT, - topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT + consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, + topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT ) public class CountNoteCollectConsumer implements RocketMQListener { diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java index 14a4077..12c7a29 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteLikeConsumer.java @@ -28,8 +28,8 @@ import java.util.stream.Collectors; @Component @Slf4j @RocketMQMessageListener( - consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE, - topic = MQConstants.TOPIC_COUNT_NOTE_LIKE + consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE, + topic = MQConstants.TOPIC_LIKE_OR_UNLIKE ) public class CountNoteLikeConsumer implements RocketMQListener { diff --git a/han-note-note/han-note-note-biz/pom.xml b/han-note-note/han-note-note-biz/pom.xml index 8272190..0764b26 100644 --- a/han-note-note/han-note-note-biz/pom.xml +++ b/han-note-note/han-note-note-biz/pom.xml @@ -118,6 +118,13 @@ rocketmq-spring-boot-starter + + + org.apache.rocketmq + rocketmq-client + + + diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java index a9d4b9a..7efe22b 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java @@ -1,172 +1,154 @@ package com.hanserwei.hannote.note.biz.comsumer; +import cn.hutool.core.collection.CollUtil; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.note.biz.constant.MQConstants; import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO; import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper; import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO; +import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.messaging.support.MessageBuilder; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; -@SuppressWarnings({"UnstableApiUsage", "DuplicatedCode"}) +@SuppressWarnings({"UnstableApiUsage"}) @Component @Slf4j -@RocketMQMessageListener( - consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, - topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, - consumeMode = ConsumeMode.ORDERLY -) -public class CollectUnCollectNoteConsumer implements RocketMQListener { +public class CollectUnCollectNoteConsumer { - // 每秒创建 5000 个令牌 + // 每秒创建5000个令牌 private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Value("${rocketmq.name-server}") + private String nameServer; + @Resource + private DefaultMQPushConsumer consumer; @Resource private NoteCollectionDOMapper noteCollectionDOMapper; - @Resource - private RocketMQTemplate rocketMQTemplate; - @Override - public void onMessage(Message message) { - // 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得 - rateLimiter.acquire(); + @Bean(name = "CollectUnCollectNoteConsumer") + public DefaultMQPushConsumer mqPushConsumer() throws MQClientException { + // Group组 + String group = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT; - // 幂等性: 通过联合唯一索引保证 + // 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名 + consumer = new DefaultMQPushConsumer(group); - // 消息体 - String bodyJsonStr = new String(message.getBody()); - // 标签 - String tags = message.getTags(); + // 设置 RocketMQ 的 NameServer 地址 + consumer.setNamesrvAddr(nameServer); - log.info("==> CollectUnCollectNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags); + // 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息) + consumer.subscribe(MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, "*"); - // 根据 MQ 标签,判断操作类型 - if (Objects.equals(tags, MQConstants.TAG_COLLECT)) { // 收藏笔记 - handleCollectNoteTagMessage(bodyJsonStr); - } else if (Objects.equals(tags, MQConstants.TAG_UN_COLLECT)) { // 取消收藏笔记 - handleUnCollectNoteTagMessage(bodyJsonStr); - } - } + // 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - /** - * 处理取消收藏笔记的 MQ 消息 - * - * @param bodyJsonStr 消息体 - */ - private void handleUnCollectNoteTagMessage(String bodyJsonStr) { - // 消息体 JSON 字符串转 DTO - CollectUnCollectNoteMqDTO unCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class); + // 设置消息消费模式,这里使用集群模式 (CLUSTERING) + consumer.setMessageModel(MessageModel.CLUSTERING); - if (Objects.isNull(unCollectNoteMqDTO)) return; + // 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。 + consumer.setMaxReconsumeTimes(3); + // 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。 + consumer.setConsumeMessageBatchMaxSize(30); + // 设置拉取间隔, 单位毫秒 + consumer.setPullInterval(1000); - // 用户ID - Long userId = unCollectNoteMqDTO.getUserId(); - // 收藏的笔记ID - Long noteId = unCollectNoteMqDTO.getNoteId(); - // 操作类型 - Integer type = unCollectNoteMqDTO.getType(); - // 收藏时间 - LocalDateTime createTime = unCollectNoteMqDTO.getCreateTime(); + // 注册消息监听器 + consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { + log.info("==> 【笔记收藏、取消收藏】本批次消息大小: {}", msgs.size()); + try { + // 令牌桶流控, 以控制数据库能够承受的 QPS + rateLimiter.acquire(); - // 构建 DO 对象 - NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder() - .userId(userId) - .noteId(noteId) - .createTime(createTime) - .status(type) - .build(); + // 幂等性: 通过联合唯一索引保证 - // 取消收藏:记录更新 - int count = noteCollectionDOMapper.update2UnCollectByUserIdAndNoteId(noteCollectionDO); + // 消息体 Json 字符串转 DTO + List collectUnCollectNoteMqDTOS = Lists.newArrayList(); + msgs.forEach(msg -> { + String msgJson = new String(msg.getBody()); + log.info("==> Consumer - Received message: {}", msgJson); + collectUnCollectNoteMqDTOS.add(JsonUtils.parseObject(msgJson, CollectUnCollectNoteMqDTO.class)); + }); - if (count == 0) { - return; - } + // 1.内存级操作合并 + //按用户ID分组 + Map> groupMap = collectUnCollectNoteMqDTOS.stream() + .collect(Collectors.groupingBy(CollectUnCollectNoteMqDTO::getUserId)); + //对每个用户按照用户ID分组并且过滤合并 + // 对每个用户的操作按 noteId 二次分组,并过滤合并 + List finalOperations = groupMap.values().stream() + .flatMap(userOperations -> { + // 按 noteId 分组 + Map> noteGroupMap = userOperations.stream() + .collect(Collectors.groupingBy(CollectUnCollectNoteMqDTO::getNoteId)); - // 更新数据库成功后,发送计数 MQ - org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) - .build(); + // 处理每个 noteId 的分组 + // 取最后一次操作(消息是有序的) + return noteGroupMap.values().stream() + .filter(operations -> { + int size = operations.size(); + // 根据奇偶性判断是否需要处理 + // 偶数次操作:最终状态抵消,无需写入 + // 奇数次操作:保留最后一次操作 + return size % 2 != 0; + }) + .map(List::getLast); + }) + .toList(); - // 异步发送 MQ 消息 - rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("==> 【计数: 笔记取消收藏】MQ 发送成功,SendResult: {}", sendResult); - } + // 2. 批量写入数据库 + if (CollUtil.isNotEmpty(finalOperations)) { + // DTO 转 DO + List noteCollectionDOS = finalOperations.stream() + .map(finalOperation -> NoteCollectionDO.builder() + .userId(finalOperation.getUserId()) + .noteId(finalOperation.getNoteId()) + .createTime(finalOperation.getCreateTime()) + .status(finalOperation.getType()) + .build()) + .toList(); - @Override - public void onException(Throwable throwable) { - log.error("==> 【计数: 笔记取消收藏】MQ 发送异常: ", throwable); + // 批量写入 + noteCollectionDOMapper.batchInsertOrUpdate(noteCollectionDOS); + } + + + // 手动 ACK,告诉 RocketMQ 这批次消息消费成功 + return ConsumeOrderlyStatus.SUCCESS; + } catch (Exception e) { + log.error("", e); + // 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试 + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } }); + // 启动消费者 + consumer.start(); + return consumer; } - /** - * 处理收藏笔记的 MQ 消息 - * - * @param bodyJsonStr 消息体 - */ - private void handleCollectNoteTagMessage(String bodyJsonStr) { - // 消息体 JSON 字符串转 DTO - CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class); - - if (Objects.isNull(collectUnCollectNoteMqDTO)) return; - - // 用户ID - Long userId = collectUnCollectNoteMqDTO.getUserId(); - // 收藏的笔记ID - Long noteId = collectUnCollectNoteMqDTO.getNoteId(); - // 操作类型 - Integer type = collectUnCollectNoteMqDTO.getType(); - // 收藏时间 - LocalDateTime createTime = collectUnCollectNoteMqDTO.getCreateTime(); - - // 构建 DO 对象 - NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder() - .userId(userId) - .noteId(noteId) - .createTime(createTime) - .status(type) - .build(); - - // 添加或更新笔记收藏记录 - boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO); - - if (!isSuccess) { - return; + @PreDestroy + public void destroy() { + if (Objects.nonNull(consumer)) { + try { + consumer.shutdown(); // 关闭消费者 + } catch (Exception e) { + log.error("", e); + } } - - // 发送计数 MQ - - // 更新数据库成功后,发送计数 MQ - org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) - .build(); - - // 异步发送 MQ 消息 - // 异步发送 MQ 消息 - rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("==> 【计数: 笔记收藏】MQ 发送成功,SendResult: {}", sendResult); - } - - @Override - public void onException(Throwable throwable) { - log.error("==> 【计数: 笔记收藏】MQ 发送异常: ", throwable); - } - }); } } diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java index c6e45d2..b8045a0 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java @@ -1,179 +1,154 @@ package com.hanserwei.hannote.note.biz.comsumer; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import cn.hutool.core.collection.CollUtil; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.hannote.note.biz.constant.MQConstants; import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO; import com.hanserwei.hannote.note.biz.domain.mapper.NoteLikeDOMapper; -import com.hanserwei.hannote.note.biz.enums.LikeUnlikeNoteTypeEnum; import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO; -import com.hanserwei.hannote.note.biz.service.NoteLikeDOService; +import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.messaging.support.MessageBuilder; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; @SuppressWarnings({"UnstableApiUsage"}) @Component -@RocketMQMessageListener( - consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE, - topic = MQConstants.TOPIC_LIKE_OR_UNLIKE, - consumeMode = ConsumeMode.ORDERLY// 顺序消费 -) @Slf4j -public class LikeUnlikeNoteConsumer implements RocketMQListener { +public class LikeUnlikeNoteConsumer { - // 每秒创建 5000 个令牌 + // 每秒创建5000个令牌 private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Value("${rocketmq.name-server}") + private String nameServer; + @Resource + private DefaultMQPushConsumer consumer; @Resource private NoteLikeDOMapper noteLikeDOMapper; - @Resource - private NoteLikeDOService noteLikeDOService; - @Resource - private RocketMQTemplate rocketMQTemplate; - @Override - public void onMessage(Message message) { - // 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得 - rateLimiter.acquire(); + @Bean(name = "LikeUnLikeNoteConsumer") + public DefaultMQPushConsumer mqPushConsumer() throws MQClientException { + // Group组 + String group = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE; - // 幂等性,通过联合索引保证 + // 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名 + consumer = new DefaultMQPushConsumer(group); - // 消息体 - String bodyJsonStr = new String(message.getBody()); - // 标签 - String tags = message.getTags(); + // 设置 RocketMQ 的 NameServer 地址 + consumer.setNamesrvAddr(nameServer); - log.info("==> LikeUnlikeNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags); + // 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息) + consumer.subscribe(MQConstants.TOPIC_LIKE_OR_UNLIKE, "*"); - // 根据 MQ 标签,判断操作类型 - if (Objects.equals(tags, MQConstants.TAG_LIKE)) { // 点赞笔记 - handleLikeNoteTagMessage(bodyJsonStr); - } else if (Objects.equals(tags, MQConstants.TAG_UNLIKE)) { // 取消点赞笔记 - handleUnlikeNoteTagMessage(bodyJsonStr); - } - } + // 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。 + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - /** - * 处理取消点赞笔记的 MQ 消息 - * - * @param bodyJsonStr 消息体 - */ - private void handleUnlikeNoteTagMessage(String bodyJsonStr) { - // 消息体 JSON 字符串转 DTO - LikeUnlikeNoteMqDTO unlikeNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, LikeUnlikeNoteMqDTO.class); - if (Objects.isNull(unlikeNoteMqDTO)) { - return; - } - // 用户ID - Long userId = unlikeNoteMqDTO.getUserId(); - // 点赞的笔记ID - Long noteId = unlikeNoteMqDTO.getNoteId(); - // 操作类型 - Integer type = unlikeNoteMqDTO.getType(); - // 取消点赞时间 - LocalDateTime createTime = unlikeNoteMqDTO.getCreateTime(); + // 设置消息消费模式,这里使用集群模式 (CLUSTERING) + consumer.setMessageModel(MessageModel.CLUSTERING); - // 设置要更新的字段值 - NoteLikeDO updateEntity = NoteLikeDO.builder() - .createTime(createTime) // 更新时间 - .status(type) // 设置新的状态值 (例如 0 表示取消点赞) - .build(); + // 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。 + consumer.setMaxReconsumeTimes(3); + // 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。 + consumer.setConsumeMessageBatchMaxSize(30); + // 设置拉取间隔, 单位毫秒 + consumer.setPullInterval(1000); - // 设置更新条件:where user_id = [userId] and note_id = [noteId] and status = 1 - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.eq(NoteLikeDO::getUserId, userId) - .eq(NoteLikeDO::getNoteId, noteId) - .eq(NoteLikeDO::getStatus, LikeUnlikeNoteTypeEnum.LIKE.getCode()); // 确保只更新当前为“已点赞”的记录 + // 注册消息监听器 + consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { + log.info("==> 【笔记点赞、取消点赞】本批次消息大小: {}", msgs.size()); + try { + // 令牌桶流控, 以控制数据库能够承受的 QPS + rateLimiter.acquire(); - // 执行更新 - boolean update = noteLikeDOService.update(updateEntity, wrapper); - log.info("==> 【取消点赞笔记】更新数据库成功,update: {}", update); + // 幂等性: 通过联合唯一索引保证 - if (!update) { - return; - } - // 更新数据库成功后,发送计数 MQ - org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) - .build(); + // 消息体 Json 字符串转 DTO + List likeUnlikeNoteMqDTOS = Lists.newArrayList(); + msgs.forEach(msg -> { + String msgJson = new String(msg.getBody()); + log.info("==> Consumer - Received message: {}", msgJson); + likeUnlikeNoteMqDTOS.add(JsonUtils.parseObject(msgJson, LikeUnlikeNoteMqDTO.class)); + }); - // 异步发送 MQ 消息 - rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("==> 【计数: 笔记取消点赞】MQ 发送成功,SendResult: {}", sendResult); - } + // 1.内存级操作合并 + //按用户ID分组 + Map> groupMap = likeUnlikeNoteMqDTOS.stream() + .collect(Collectors.groupingBy(LikeUnlikeNoteMqDTO::getUserId)); + //对每个用户按照用户ID分组并且过滤合并 + // 对每个用户的操作按 noteId 二次分组,并过滤合并 + List finalOperations = groupMap.values().stream() + .flatMap(userOperations -> { + // 按 noteId 分组 + Map> noteGroupMap = userOperations.stream() + .collect(Collectors.groupingBy(LikeUnlikeNoteMqDTO::getNoteId)); - @Override - public void onException(Throwable throwable) { - log.error("==> 【计数: 笔记取消点赞】MQ 发送异常: ", throwable); - } - }); - } + // 处理每个 noteId 的分组 + // 取最后一次操作(消息是有序的) + return noteGroupMap.values().stream() + .filter(operations -> { + int size = operations.size(); + // 根据奇偶性判断是否需要处理 + // 偶数次操作:最终状态抵消,无需写入 + // 奇数次操作:保留最后一次操作 + return size % 2 != 0; + }) + .map(List::getLast); + }) + .toList(); - /** - * 处理点赞笔记的 MQ 消息 - * - * @param bodyJsonStr 消息体 - */ - private void handleLikeNoteTagMessage(String bodyJsonStr) { - // 消息体 JSON 字符串转 DTO - LikeUnlikeNoteMqDTO likeNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, LikeUnlikeNoteMqDTO.class); + // 2. 批量写入数据库 + if (CollUtil.isNotEmpty(finalOperations)) { + // DTO 转 DO + List noteLikeDOS = finalOperations.stream() + .map(finalOperation -> NoteLikeDO.builder() + .userId(finalOperation.getUserId()) + .noteId(finalOperation.getNoteId()) + .createTime(finalOperation.getCreateTime()) + .status(finalOperation.getType()) + .build()) + .toList(); - if (Objects.isNull(likeNoteMqDTO)) return; + // 批量写入 + noteLikeDOMapper.batchInsertOrUpdate(noteLikeDOS); + } - // 用户ID - Long userId = likeNoteMqDTO.getUserId(); - // 点赞的笔记ID - Long noteId = likeNoteMqDTO.getNoteId(); - // 操作类型 - Integer type = likeNoteMqDTO.getType(); - // 点赞时间 - LocalDateTime createTime = likeNoteMqDTO.getCreateTime(); - // 构建 DO 对象 - NoteLikeDO noteLikeDO = NoteLikeDO.builder() - .userId(userId) - .noteId(noteId) - .createTime(createTime) - .status(type) - .build(); - - // 添加或更新笔记点赞记录 - boolean count = noteLikeDOMapper.insertOrUpdate(noteLikeDO); - - if (!count) { - return; - } - - // 发送计数 MQ - // 更新数据库成功后,发送计数 MQ - org.springframework.messaging.Message message = MessageBuilder.withPayload(bodyJsonStr) - .build(); - - // 异步发送 MQ 消息 - rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("==> 【计数: 笔记点赞】MQ 发送成功,SendResult: {}", sendResult); - } - - @Override - public void onException(Throwable throwable) { - log.error("==> 【计数: 笔记点赞】MQ 发送异常: ", throwable); + // 手动 ACK,告诉 RocketMQ 这批次消息消费成功 + return ConsumeOrderlyStatus.SUCCESS; + } catch (Exception e) { + log.error("", e); + // 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试 + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } }); + // 启动消费者 + consumer.start(); + return consumer; + } + + @PreDestroy + public void destroy() { + if (Objects.nonNull(consumer)) { + try { + consumer.shutdown(); // 关闭消费者 + } catch (Exception e) { + log.error("", e); + } + } } } diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java index 31e5793..04b7b93 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java @@ -17,21 +17,11 @@ public interface MQConstants { */ String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic"; - /** - * Topic: 计数 - 笔记点赞数 - */ - String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; - /** * Topic: 收藏、取消收藏共用一个 */ String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic"; - /** - * Topic: 计数 - 笔记收藏数 - */ - String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic"; - /** * Topic: 笔记操作(发布、删除) */ diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java index 0379ba2..8849d1e 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java @@ -3,6 +3,9 @@ package com.hanserwei.hannote.note.biz.domain.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; @Mapper public interface NoteCollectionDOMapper extends BaseMapper { @@ -21,4 +24,12 @@ public interface NoteCollectionDOMapper extends BaseMapper { * @return 影响行数 */ int update2UnCollectByUserIdAndNoteId(NoteCollectionDO noteCollectionDO); + + /** + * 批量新增笔记收藏记录 + * + * @param noteCollectionDOS 笔记收藏记录 + * @return 影响行数 + */ + int batchInsertOrUpdate(@Param("noteCollectionDOS") List noteCollectionDOS); } \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteLikeDOMapper.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteLikeDOMapper.java index 665b4d4..59db1f1 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteLikeDOMapper.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteLikeDOMapper.java @@ -3,6 +3,9 @@ package com.hanserwei.hannote.note.biz.domain.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; @Mapper public interface NoteLikeDOMapper extends BaseMapper { @@ -13,4 +16,12 @@ public interface NoteLikeDOMapper extends BaseMapper { * @return 影响行数 */ boolean insertOrUpdate(NoteLikeDO noteLikeDO); + + /** + * 批量插入或更新 + * + * @param noteLikeDOS 批量笔记点赞记录 + * @return 影响行数 + */ + int batchInsertOrUpdate(@Param("noteLikeDOS") List noteLikeDOS); } \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml b/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml index b59931d..27b9d98 100644 --- a/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml +++ b/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml @@ -31,4 +31,13 @@ and note_id = #{noteId} and status = 1 + + + INSERT INTO t_note_collection (user_id, note_id, status, create_time) + VALUES + + (#{item.userId}, #{item.noteId}, #{item.status}, #{item.createTime}) + + ON DUPLICATE KEY UPDATE status = VALUES(status) + \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteLikeDOMapper.xml b/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteLikeDOMapper.xml index 8a172f7..bf738aa 100644 --- a/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteLikeDOMapper.xml +++ b/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteLikeDOMapper.xml @@ -21,4 +21,13 @@ ON DUPLICATE KEY UPDATE create_time = #{createTime}, status = #{status}; + + + INSERT INTO t_note_like (user_id, note_id, status, create_time) + VALUES + + (#{item.userId}, #{item.noteId}, #{item.status}, #{item.createTime}) + + ON DUPLICATE KEY UPDATE status = VALUES(status) + \ No newline at end of file diff --git a/han-note-user-relation/han-note-user-relation-biz/pom.xml b/han-note-user-relation/han-note-user-relation-biz/pom.xml index 41545a6..6292311 100644 --- a/han-note-user-relation/han-note-user-relation-biz/pom.xml +++ b/han-note-user-relation/han-note-user-relation-biz/pom.xml @@ -108,6 +108,12 @@ spring-cloud-starter-alibaba-nacos-config + + org.apache.rocketmq + rocketmq-client + + +