feat(note): 引入 RocketMQ 实现缓存一致性
- 新增 RocketMQ 配置类,引入自动配置 - 添加 RocketMQ 依赖到 pom.xml 文件 - 定义 MQ 常量接口,包括删除本地缓存和延迟删除 Redis 缓存的主题 - 实现延迟删除 Redis 缓存的消息消费者 - 实现删除本地缓存的广播模式消息消费者 - 在笔记更新服务中集成 RocketMQ,实现延迟双删策略 - 发送异步延时消息用于最终删除 Redis 缓存 - 发送同步广播消息以清除所有实例中的本地缓存
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
package com.hanserwei.hannote.note.biz.comsumer;
|
||||
|
||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.note.biz.constant.RedisKeyConstants;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_DELAY_DELETE_NOTE_REDIS_CACHE, // Group
|
||||
topic = MQConstants.TOPIC_DELAY_DELETE_NOTE_REDIS_CACHE // 消费的主题 Topic
|
||||
)
|
||||
public class DelayDeleteNoteRedisCacheConsumer implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
Long noteId = Long.valueOf(body);
|
||||
log.info("## 延迟消息消费成功, noteId: {}", noteId);
|
||||
|
||||
// 删除 Redis 笔记缓存
|
||||
String noteDetailRedisKey = RedisKeyConstants.buildNoteDetailKey(noteId);
|
||||
redisTemplate.delete(noteDetailRedisKey);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.hanserwei.hannote.note.biz.comsumer;
|
||||
|
||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "han_note_group",
|
||||
topic = MQConstants.TOPIC_DELETE_NOTE_LOCAL_CACHE,
|
||||
messageModel = MessageModel.BROADCASTING
|
||||
)
|
||||
public class DeleteNoteLocalCacheConsumer implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
Long noteId = Long.valueOf(body);
|
||||
log.info("## 消费者消费成功, noteId: {}", noteId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.hanserwei.hannote.note.biz.config;
|
||||
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
@Configuration
|
||||
@Import(RocketMQAutoConfiguration.class)
|
||||
public class RocketMQConfig {
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.hanserwei.hannote.note.biz.constant;
|
||||
|
||||
public interface MQConstants {
|
||||
|
||||
/**
|
||||
* Topic 主题:删除笔记的本地缓存
|
||||
*/
|
||||
String TOPIC_DELETE_NOTE_LOCAL_CACHE = "DeleteNoteLocalCacheTopic";
|
||||
|
||||
/**
|
||||
* Topic 主题:延迟双删 Redis 笔记缓存
|
||||
*/
|
||||
String TOPIC_DELAY_DELETE_NOTE_REDIS_CACHE = "DelayDeleteNoteRedisCacheTopic";
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import com.hanserwei.framework.biz.context.holder.LoginUserContextHolder;
|
||||
import com.hanserwei.framework.common.exception.ApiException;
|
||||
import com.hanserwei.framework.common.response.Response;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.note.biz.constant.RedisKeyConstants;
|
||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteDO;
|
||||
import com.hanserwei.hannote.note.biz.domain.dataobject.TopicDO;
|
||||
@@ -33,7 +34,12 @@ import jakarta.annotation.Resource;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
@@ -60,6 +66,8 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
@Resource
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 笔记详情本地缓存
|
||||
@@ -344,6 +352,10 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
||||
}
|
||||
}
|
||||
|
||||
// 删除 Redis 缓存
|
||||
String noteDetailRedisKey = RedisKeyConstants.buildNoteDetailKey(noteId);
|
||||
redisTemplate.delete(noteDetailRedisKey);
|
||||
|
||||
// 更新笔记元数据表
|
||||
String content = updateNoteReqVO.getContent();
|
||||
NoteDO noteDO = NoteDO.builder()
|
||||
@@ -361,13 +373,39 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
||||
if (!updateResult){
|
||||
throw new ApiException(ResponseCodeEnum.NOTE_UPDATE_FAIL);
|
||||
}
|
||||
|
||||
// 一致性保证:延迟双删策略
|
||||
// 异步发送延时消息
|
||||
Message<String> message = MessageBuilder.withPayload(String.valueOf(noteId))
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_DELAY_DELETE_NOTE_REDIS_CACHE, message,
|
||||
new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("## 延时删除 Redis 笔记缓存消息发送成功...");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable e) {
|
||||
log.error("## 延时删除 Redis 笔记缓存消息发送失败...", e);
|
||||
}
|
||||
},
|
||||
3000, // 超时时间(毫秒)
|
||||
1 // 延迟级别,1 表示延时 1s
|
||||
);
|
||||
|
||||
// 删除Redis缓存
|
||||
String noteDetailRedisKey = RedisKeyConstants.buildNoteDetailKey(noteId);
|
||||
noteDetailRedisKey = RedisKeyConstants.buildNoteDetailKey(noteId);
|
||||
redisTemplate.delete(noteDetailRedisKey);
|
||||
|
||||
// 删除本地缓存
|
||||
LOCAL_CACHE.invalidate(noteId);
|
||||
|
||||
// 同步发送广播模式 MQ,将所有实例中的本地缓存都删除掉
|
||||
rocketMQTemplate.syncSend(MQConstants.TOPIC_DELETE_NOTE_LOCAL_CACHE, noteId);
|
||||
log.info("====> MQ:删除笔记本地缓存发送成功...");
|
||||
|
||||
// 笔记内容更新
|
||||
// 查询此篇笔记内容对应的 UUID
|
||||
NoteDO noteDO1 = this.getById(noteId);
|
||||
|
||||
Reference in New Issue
Block a user