refactor(mq): 重构笔记收藏与点赞的MQ消费者实现以提升性能和可靠性
- 使用DefaultMQPushConsumer替代RocketMQListener以支持批量消费 - 实现消息批量处理逻辑,减少数据库交互次数 - 添加内存级操作合并,避免重复操作写入数据库 - 配置流量削峰限流,控制数据库QPS在可接受范围 - 增加重试机制和手动ACK确保消息可靠消费 - 调整MQ主题订阅关系,统一消息流转逻辑 - 新增RocketMQ客户端依赖以支持底层API调 - 优化消费者启动和销毁流程,确保资源正确释放 - 修改Mapper支持批量插入或更新操作 - 调整计数服务消费主题,简化消息链路
This commit is contained in:
@@ -2,6 +2,16 @@ package com.hanserwei.hannote.count.biz.constant;
|
||||
|
||||
public interface MQConstants {
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记点赞数
|
||||
*/
|
||||
String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic";
|
||||
|
||||
/**
|
||||
* Topic: 笔记收藏、取消收藏
|
||||
*/
|
||||
String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic";
|
||||
|
||||
/**
|
||||
* Topic: 关注数计数
|
||||
*/
|
||||
|
||||
@@ -29,8 +29,8 @@ import java.util.stream.Collectors;
|
||||
@Component
|
||||
@Slf4j
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT,
|
||||
topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT
|
||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
||||
topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT
|
||||
)
|
||||
public class CountNoteCollectConsumer implements RocketMQListener<String> {
|
||||
|
||||
|
||||
@@ -28,8 +28,8 @@ import java.util.stream.Collectors;
|
||||
@Component
|
||||
@Slf4j
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE,
|
||||
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE
|
||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE,
|
||||
topic = MQConstants.TOPIC_LIKE_OR_UNLIKE
|
||||
)
|
||||
public class CountNoteLikeConsumer implements RocketMQListener<String> {
|
||||
|
||||
|
||||
@@ -118,6 +118,13 @@
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Rocket MQ 客户端 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
|
||||
@@ -1,172 +1,154 @@
|
||||
package com.hanserwei.hannote.note.biz.comsumer;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
||||
import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper;
|
||||
import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuppressWarnings({"UnstableApiUsage", "DuplicatedCode"})
|
||||
@SuppressWarnings({"UnstableApiUsage"})
|
||||
@Component
|
||||
@Slf4j
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
||||
topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
||||
consumeMode = ConsumeMode.ORDERLY
|
||||
)
|
||||
public class CollectUnCollectNoteConsumer implements RocketMQListener<Message> {
|
||||
public class CollectUnCollectNoteConsumer {
|
||||
|
||||
// 每秒创建 5000 个令牌
|
||||
// 每秒创建5000个令牌
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||
@Value("${rocketmq.name-server}")
|
||||
private String nameServer;
|
||||
@Resource
|
||||
private DefaultMQPushConsumer consumer;
|
||||
@Resource
|
||||
private NoteCollectionDOMapper noteCollectionDOMapper;
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
|
||||
@Bean(name = "CollectUnCollectNoteConsumer")
|
||||
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||
// Group组
|
||||
String group = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT;
|
||||
|
||||
// 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名
|
||||
consumer = new DefaultMQPushConsumer(group);
|
||||
|
||||
// 设置 RocketMQ 的 NameServer 地址
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
|
||||
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||
consumer.subscribe(MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, "*");
|
||||
|
||||
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||
|
||||
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||
|
||||
// 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。
|
||||
consumer.setMaxReconsumeTimes(3);
|
||||
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。
|
||||
consumer.setConsumeMessageBatchMaxSize(30);
|
||||
// 设置拉取间隔, 单位毫秒
|
||||
consumer.setPullInterval(1000);
|
||||
|
||||
// 注册消息监听器
|
||||
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||
log.info("==> 【笔记收藏、取消收藏】本批次消息大小: {}", msgs.size());
|
||||
try {
|
||||
// 令牌桶流控, 以控制数据库能够承受的 QPS
|
||||
rateLimiter.acquire();
|
||||
|
||||
// 幂等性: 通过联合唯一索引保证
|
||||
|
||||
// 消息体
|
||||
String bodyJsonStr = new String(message.getBody());
|
||||
// 标签
|
||||
String tags = message.getTags();
|
||||
// 消息体 Json 字符串转 DTO
|
||||
List<CollectUnCollectNoteMqDTO> collectUnCollectNoteMqDTOS = Lists.newArrayList();
|
||||
msgs.forEach(msg -> {
|
||||
String msgJson = new String(msg.getBody());
|
||||
log.info("==> Consumer - Received message: {}", msgJson);
|
||||
collectUnCollectNoteMqDTOS.add(JsonUtils.parseObject(msgJson, CollectUnCollectNoteMqDTO.class));
|
||||
});
|
||||
|
||||
log.info("==> CollectUnCollectNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags);
|
||||
// 1.内存级操作合并
|
||||
//按用户ID分组
|
||||
Map<Long, List<CollectUnCollectNoteMqDTO>> groupMap = collectUnCollectNoteMqDTOS.stream()
|
||||
.collect(Collectors.groupingBy(CollectUnCollectNoteMqDTO::getUserId));
|
||||
//对每个用户按照用户ID分组并且过滤合并
|
||||
// 对每个用户的操作按 noteId 二次分组,并过滤合并
|
||||
List<CollectUnCollectNoteMqDTO> finalOperations = groupMap.values().stream()
|
||||
.flatMap(userOperations -> {
|
||||
// 按 noteId 分组
|
||||
Map<Long, List<CollectUnCollectNoteMqDTO>> noteGroupMap = userOperations.stream()
|
||||
.collect(Collectors.groupingBy(CollectUnCollectNoteMqDTO::getNoteId));
|
||||
|
||||
// 根据 MQ 标签,判断操作类型
|
||||
if (Objects.equals(tags, MQConstants.TAG_COLLECT)) { // 收藏笔记
|
||||
handleCollectNoteTagMessage(bodyJsonStr);
|
||||
} else if (Objects.equals(tags, MQConstants.TAG_UN_COLLECT)) { // 取消收藏笔记
|
||||
handleUnCollectNoteTagMessage(bodyJsonStr);
|
||||
}
|
||||
// 处理每个 noteId 的分组
|
||||
// 取最后一次操作(消息是有序的)
|
||||
return noteGroupMap.values().stream()
|
||||
.filter(operations -> {
|
||||
int size = operations.size();
|
||||
// 根据奇偶性判断是否需要处理
|
||||
// 偶数次操作:最终状态抵消,无需写入
|
||||
// 奇数次操作:保留最后一次操作
|
||||
return size % 2 != 0;
|
||||
})
|
||||
.map(List::getLast);
|
||||
})
|
||||
.toList();
|
||||
|
||||
// 2. 批量写入数据库
|
||||
if (CollUtil.isNotEmpty(finalOperations)) {
|
||||
// DTO 转 DO
|
||||
List<NoteCollectionDO> noteCollectionDOS = finalOperations.stream()
|
||||
.map(finalOperation -> NoteCollectionDO.builder()
|
||||
.userId(finalOperation.getUserId())
|
||||
.noteId(finalOperation.getNoteId())
|
||||
.createTime(finalOperation.getCreateTime())
|
||||
.status(finalOperation.getType())
|
||||
.build())
|
||||
.toList();
|
||||
|
||||
// 批量写入
|
||||
noteCollectionDOMapper.batchInsertOrUpdate(noteCollectionDOS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理取消收藏笔记的 MQ 消息
|
||||
*
|
||||
* @param bodyJsonStr 消息体
|
||||
*/
|
||||
private void handleUnCollectNoteTagMessage(String bodyJsonStr) {
|
||||
// 消息体 JSON 字符串转 DTO
|
||||
CollectUnCollectNoteMqDTO unCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class);
|
||||
|
||||
if (Objects.isNull(unCollectNoteMqDTO)) return;
|
||||
|
||||
// 用户ID
|
||||
Long userId = unCollectNoteMqDTO.getUserId();
|
||||
// 收藏的笔记ID
|
||||
Long noteId = unCollectNoteMqDTO.getNoteId();
|
||||
// 操作类型
|
||||
Integer type = unCollectNoteMqDTO.getType();
|
||||
// 收藏时间
|
||||
LocalDateTime createTime = unCollectNoteMqDTO.getCreateTime();
|
||||
|
||||
// 构建 DO 对象
|
||||
NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder()
|
||||
.userId(userId)
|
||||
.noteId(noteId)
|
||||
.createTime(createTime)
|
||||
.status(type)
|
||||
.build();
|
||||
|
||||
// 取消收藏:记录更新
|
||||
int count = noteCollectionDOMapper.update2UnCollectByUserIdAndNoteId(noteCollectionDO);
|
||||
|
||||
if (count == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新数据库成功后,发送计数 MQ
|
||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【计数: 笔记取消收藏】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【计数: 笔记取消收藏】MQ 发送异常: ", throwable);
|
||||
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
// 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
}
|
||||
});
|
||||
|
||||
// 启动消费者
|
||||
consumer.start();
|
||||
return consumer;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理收藏笔记的 MQ 消息
|
||||
*
|
||||
* @param bodyJsonStr 消息体
|
||||
*/
|
||||
private void handleCollectNoteTagMessage(String bodyJsonStr) {
|
||||
// 消息体 JSON 字符串转 DTO
|
||||
CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class);
|
||||
|
||||
if (Objects.isNull(collectUnCollectNoteMqDTO)) return;
|
||||
|
||||
// 用户ID
|
||||
Long userId = collectUnCollectNoteMqDTO.getUserId();
|
||||
// 收藏的笔记ID
|
||||
Long noteId = collectUnCollectNoteMqDTO.getNoteId();
|
||||
// 操作类型
|
||||
Integer type = collectUnCollectNoteMqDTO.getType();
|
||||
// 收藏时间
|
||||
LocalDateTime createTime = collectUnCollectNoteMqDTO.getCreateTime();
|
||||
|
||||
// 构建 DO 对象
|
||||
NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder()
|
||||
.userId(userId)
|
||||
.noteId(noteId)
|
||||
.createTime(createTime)
|
||||
.status(type)
|
||||
.build();
|
||||
|
||||
// 添加或更新笔记收藏记录
|
||||
boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO);
|
||||
|
||||
if (!isSuccess) {
|
||||
return;
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (Objects.nonNull(consumer)) {
|
||||
try {
|
||||
consumer.shutdown(); // 关闭消费者
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
|
||||
// 发送计数 MQ
|
||||
|
||||
// 更新数据库成功后,发送计数 MQ
|
||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【计数: 笔记收藏】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【计数: 笔记收藏】MQ 发送异常: ", throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,179 +1,154 @@
|
||||
package com.hanserwei.hannote.note.biz.comsumer;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
||||
import com.hanserwei.hannote.note.biz.domain.mapper.NoteLikeDOMapper;
|
||||
import com.hanserwei.hannote.note.biz.enums.LikeUnlikeNoteTypeEnum;
|
||||
import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO;
|
||||
import com.hanserwei.hannote.note.biz.service.NoteLikeDOService;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuppressWarnings({"UnstableApiUsage"})
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE,
|
||||
topic = MQConstants.TOPIC_LIKE_OR_UNLIKE,
|
||||
consumeMode = ConsumeMode.ORDERLY// 顺序消费
|
||||
)
|
||||
@Slf4j
|
||||
public class LikeUnlikeNoteConsumer implements RocketMQListener<Message> {
|
||||
public class LikeUnlikeNoteConsumer {
|
||||
|
||||
// 每秒创建 5000 个令牌
|
||||
// 每秒创建5000个令牌
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||
@Value("${rocketmq.name-server}")
|
||||
private String nameServer;
|
||||
@Resource
|
||||
private DefaultMQPushConsumer consumer;
|
||||
@Resource
|
||||
private NoteLikeDOMapper noteLikeDOMapper;
|
||||
@Resource
|
||||
private NoteLikeDOService noteLikeDOService;
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
|
||||
@Bean(name = "LikeUnLikeNoteConsumer")
|
||||
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||
// Group组
|
||||
String group = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE;
|
||||
|
||||
// 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名
|
||||
consumer = new DefaultMQPushConsumer(group);
|
||||
|
||||
// 设置 RocketMQ 的 NameServer 地址
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
|
||||
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||
consumer.subscribe(MQConstants.TOPIC_LIKE_OR_UNLIKE, "*");
|
||||
|
||||
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||
|
||||
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||
|
||||
// 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。
|
||||
consumer.setMaxReconsumeTimes(3);
|
||||
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。
|
||||
consumer.setConsumeMessageBatchMaxSize(30);
|
||||
// 设置拉取间隔, 单位毫秒
|
||||
consumer.setPullInterval(1000);
|
||||
|
||||
// 注册消息监听器
|
||||
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||
log.info("==> 【笔记点赞、取消点赞】本批次消息大小: {}", msgs.size());
|
||||
try {
|
||||
// 令牌桶流控, 以控制数据库能够承受的 QPS
|
||||
rateLimiter.acquire();
|
||||
|
||||
// 幂等性,通过联合索引保证
|
||||
// 幂等性: 通过联合唯一索引保证
|
||||
|
||||
// 消息体
|
||||
String bodyJsonStr = new String(message.getBody());
|
||||
// 标签
|
||||
String tags = message.getTags();
|
||||
|
||||
log.info("==> LikeUnlikeNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags);
|
||||
|
||||
// 根据 MQ 标签,判断操作类型
|
||||
if (Objects.equals(tags, MQConstants.TAG_LIKE)) { // 点赞笔记
|
||||
handleLikeNoteTagMessage(bodyJsonStr);
|
||||
} else if (Objects.equals(tags, MQConstants.TAG_UNLIKE)) { // 取消点赞笔记
|
||||
handleUnlikeNoteTagMessage(bodyJsonStr);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理取消点赞笔记的 MQ 消息
|
||||
*
|
||||
* @param bodyJsonStr 消息体
|
||||
*/
|
||||
private void handleUnlikeNoteTagMessage(String bodyJsonStr) {
|
||||
// 消息体 JSON 字符串转 DTO
|
||||
LikeUnlikeNoteMqDTO unlikeNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, LikeUnlikeNoteMqDTO.class);
|
||||
if (Objects.isNull(unlikeNoteMqDTO)) {
|
||||
return;
|
||||
}
|
||||
// 用户ID
|
||||
Long userId = unlikeNoteMqDTO.getUserId();
|
||||
// 点赞的笔记ID
|
||||
Long noteId = unlikeNoteMqDTO.getNoteId();
|
||||
// 操作类型
|
||||
Integer type = unlikeNoteMqDTO.getType();
|
||||
// 取消点赞时间
|
||||
LocalDateTime createTime = unlikeNoteMqDTO.getCreateTime();
|
||||
|
||||
// 设置要更新的字段值
|
||||
NoteLikeDO updateEntity = NoteLikeDO.builder()
|
||||
.createTime(createTime) // 更新时间
|
||||
.status(type) // 设置新的状态值 (例如 0 表示取消点赞)
|
||||
.build();
|
||||
|
||||
// 设置更新条件:where user_id = [userId] and note_id = [noteId] and status = 1
|
||||
LambdaQueryWrapper<NoteLikeDO> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(NoteLikeDO::getUserId, userId)
|
||||
.eq(NoteLikeDO::getNoteId, noteId)
|
||||
.eq(NoteLikeDO::getStatus, LikeUnlikeNoteTypeEnum.LIKE.getCode()); // 确保只更新当前为“已点赞”的记录
|
||||
|
||||
// 执行更新
|
||||
boolean update = noteLikeDOService.update(updateEntity, wrapper);
|
||||
log.info("==> 【取消点赞笔记】更新数据库成功,update: {}", update);
|
||||
|
||||
if (!update) {
|
||||
return;
|
||||
}
|
||||
// 更新数据库成功后,发送计数 MQ
|
||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【计数: 笔记取消点赞】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【计数: 笔记取消点赞】MQ 发送异常: ", throwable);
|
||||
}
|
||||
// 消息体 Json 字符串转 DTO
|
||||
List<LikeUnlikeNoteMqDTO> likeUnlikeNoteMqDTOS = Lists.newArrayList();
|
||||
msgs.forEach(msg -> {
|
||||
String msgJson = new String(msg.getBody());
|
||||
log.info("==> Consumer - Received message: {}", msgJson);
|
||||
likeUnlikeNoteMqDTOS.add(JsonUtils.parseObject(msgJson, LikeUnlikeNoteMqDTO.class));
|
||||
});
|
||||
|
||||
// 1.内存级操作合并
|
||||
//按用户ID分组
|
||||
Map<Long, List<LikeUnlikeNoteMqDTO>> groupMap = likeUnlikeNoteMqDTOS.stream()
|
||||
.collect(Collectors.groupingBy(LikeUnlikeNoteMqDTO::getUserId));
|
||||
//对每个用户按照用户ID分组并且过滤合并
|
||||
// 对每个用户的操作按 noteId 二次分组,并过滤合并
|
||||
List<LikeUnlikeNoteMqDTO> finalOperations = groupMap.values().stream()
|
||||
.flatMap(userOperations -> {
|
||||
// 按 noteId 分组
|
||||
Map<Long, List<LikeUnlikeNoteMqDTO>> noteGroupMap = userOperations.stream()
|
||||
.collect(Collectors.groupingBy(LikeUnlikeNoteMqDTO::getNoteId));
|
||||
|
||||
// 处理每个 noteId 的分组
|
||||
// 取最后一次操作(消息是有序的)
|
||||
return noteGroupMap.values().stream()
|
||||
.filter(operations -> {
|
||||
int size = operations.size();
|
||||
// 根据奇偶性判断是否需要处理
|
||||
// 偶数次操作:最终状态抵消,无需写入
|
||||
// 奇数次操作:保留最后一次操作
|
||||
return size % 2 != 0;
|
||||
})
|
||||
.map(List::getLast);
|
||||
})
|
||||
.toList();
|
||||
|
||||
// 2. 批量写入数据库
|
||||
if (CollUtil.isNotEmpty(finalOperations)) {
|
||||
// DTO 转 DO
|
||||
List<NoteLikeDO> noteLikeDOS = finalOperations.stream()
|
||||
.map(finalOperation -> NoteLikeDO.builder()
|
||||
.userId(finalOperation.getUserId())
|
||||
.noteId(finalOperation.getNoteId())
|
||||
.createTime(finalOperation.getCreateTime())
|
||||
.status(finalOperation.getType())
|
||||
.build())
|
||||
.toList();
|
||||
|
||||
// 批量写入
|
||||
noteLikeDOMapper.batchInsertOrUpdate(noteLikeDOS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理点赞笔记的 MQ 消息
|
||||
*
|
||||
* @param bodyJsonStr 消息体
|
||||
*/
|
||||
private void handleLikeNoteTagMessage(String bodyJsonStr) {
|
||||
// 消息体 JSON 字符串转 DTO
|
||||
LikeUnlikeNoteMqDTO likeNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, LikeUnlikeNoteMqDTO.class);
|
||||
|
||||
if (Objects.isNull(likeNoteMqDTO)) return;
|
||||
|
||||
// 用户ID
|
||||
Long userId = likeNoteMqDTO.getUserId();
|
||||
// 点赞的笔记ID
|
||||
Long noteId = likeNoteMqDTO.getNoteId();
|
||||
// 操作类型
|
||||
Integer type = likeNoteMqDTO.getType();
|
||||
// 点赞时间
|
||||
LocalDateTime createTime = likeNoteMqDTO.getCreateTime();
|
||||
|
||||
// 构建 DO 对象
|
||||
NoteLikeDO noteLikeDO = NoteLikeDO.builder()
|
||||
.userId(userId)
|
||||
.noteId(noteId)
|
||||
.createTime(createTime)
|
||||
.status(type)
|
||||
.build();
|
||||
|
||||
// 添加或更新笔记点赞记录
|
||||
boolean count = noteLikeDOMapper.insertOrUpdate(noteLikeDO);
|
||||
|
||||
if (!count) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 发送计数 MQ
|
||||
// 更新数据库成功后,发送计数 MQ
|
||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【计数: 笔记点赞】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【计数: 笔记点赞】MQ 发送异常: ", throwable);
|
||||
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
// 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
}
|
||||
});
|
||||
|
||||
// 启动消费者
|
||||
consumer.start();
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (Objects.nonNull(consumer)) {
|
||||
try {
|
||||
consumer.shutdown(); // 关闭消费者
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,21 +17,11 @@ public interface MQConstants {
|
||||
*/
|
||||
String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记点赞数
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
||||
|
||||
/**
|
||||
* Topic: 收藏、取消收藏共用一个
|
||||
*/
|
||||
String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记收藏数
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
|
||||
|
||||
/**
|
||||
* Topic: 笔记操作(发布、删除)
|
||||
*/
|
||||
|
||||
@@ -3,6 +3,9 @@ package com.hanserwei.hannote.note.biz.domain.mapper;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
||||
@@ -21,4 +24,12 @@ public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
||||
* @return 影响行数
|
||||
*/
|
||||
int update2UnCollectByUserIdAndNoteId(NoteCollectionDO noteCollectionDO);
|
||||
|
||||
/**
|
||||
* 批量新增笔记收藏记录
|
||||
*
|
||||
* @param noteCollectionDOS 笔记收藏记录
|
||||
* @return 影响行数
|
||||
*/
|
||||
int batchInsertOrUpdate(@Param("noteCollectionDOS") List<NoteCollectionDO> noteCollectionDOS);
|
||||
}
|
||||
@@ -3,6 +3,9 @@ package com.hanserwei.hannote.note.biz.domain.mapper;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface NoteLikeDOMapper extends BaseMapper<NoteLikeDO> {
|
||||
@@ -13,4 +16,12 @@ public interface NoteLikeDOMapper extends BaseMapper<NoteLikeDO> {
|
||||
* @return 影响行数
|
||||
*/
|
||||
boolean insertOrUpdate(NoteLikeDO noteLikeDO);
|
||||
|
||||
/**
|
||||
* 批量插入或更新
|
||||
*
|
||||
* @param noteLikeDOS 批量笔记点赞记录
|
||||
* @return 影响行数
|
||||
*/
|
||||
int batchInsertOrUpdate(@Param("noteLikeDOS") List<NoteLikeDO> noteLikeDOS);
|
||||
}
|
||||
@@ -31,4 +31,13 @@
|
||||
and note_id = #{noteId}
|
||||
and status = 1
|
||||
</update>
|
||||
|
||||
<insert id="batchInsertOrUpdate" parameterType="list">
|
||||
INSERT INTO t_note_collection (user_id, note_id, status, create_time)
|
||||
VALUES
|
||||
<foreach item="item" collection="noteCollectionDOS" separator=",">
|
||||
(#{item.userId}, #{item.noteId}, #{item.status}, #{item.createTime})
|
||||
</foreach>
|
||||
ON DUPLICATE KEY UPDATE status = VALUES(status)
|
||||
</insert>
|
||||
</mapper>
|
||||
@@ -21,4 +21,13 @@
|
||||
ON DUPLICATE KEY UPDATE
|
||||
create_time = #{createTime}, status = #{status};
|
||||
</insert>
|
||||
|
||||
<insert id="batchInsertOrUpdate" parameterType="list">
|
||||
INSERT INTO t_note_like (user_id, note_id, status, create_time)
|
||||
VALUES
|
||||
<foreach item="item" collection="noteLikeDOS" separator=",">
|
||||
(#{item.userId}, #{item.noteId}, #{item.status}, #{item.createTime})
|
||||
</foreach>
|
||||
ON DUPLICATE KEY UPDATE status = VALUES(status)
|
||||
</insert>
|
||||
</mapper>
|
||||
@@ -108,6 +108,12 @@
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
Reference in New Issue
Block a user