feat(note): 实现笔记收藏与取消收藏功能
- 新增收藏/取消收藏 MQ 消费者 CollectUnCollectNoteConsumer - 新增 MQ 消息 DTO 类 CollectUnCollectNoteMqDTO - 新增收藏操作类型枚举 CollectUnCollectNoteTypeEnum - 在 MQConstants 中新增收藏相关主题与标签常量 - 扩展 NoteCollectionDOMapper 支持插入或更新收藏记录 - 在 NoteCollectionDOMapper.xml 中实现 insertOrUpdate SQL 逻辑 - 在 NoteServiceImpl 中构建并发送收藏 MQ 消息 - 添加流量削峰限流与幂等性处理机制
This commit is contained in:
@@ -0,0 +1,99 @@
|
|||||||
|
package com.hanserwei.hannote.note.biz.comsumer;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
|
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
||||||
|
import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper;
|
||||||
|
import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@SuppressWarnings("UnstableApiUsage")
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
consumerGroup = "han_note_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
||||||
|
topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
||||||
|
consumeMode = ConsumeMode.ORDERLY
|
||||||
|
)
|
||||||
|
public class CollectUnCollectNoteConsumer implements RocketMQListener<Message> {
|
||||||
|
|
||||||
|
// 每秒创建 5000 个令牌
|
||||||
|
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||||
|
@Resource
|
||||||
|
private NoteCollectionDOMapper noteCollectionDOMapper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
|
||||||
|
rateLimiter.acquire();
|
||||||
|
|
||||||
|
// 幂等性: 通过联合唯一索引保证
|
||||||
|
|
||||||
|
// 消息体
|
||||||
|
String bodyJsonStr = new String(message.getBody());
|
||||||
|
// 标签
|
||||||
|
String tags = message.getTags();
|
||||||
|
|
||||||
|
log.info("==> CollectUnCollectNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags);
|
||||||
|
|
||||||
|
// 根据 MQ 标签,判断操作类型
|
||||||
|
if (Objects.equals(tags, MQConstants.TAG_COLLECT)) { // 收藏笔记
|
||||||
|
handleCollectNoteTagMessage(bodyJsonStr);
|
||||||
|
} else if (Objects.equals(tags, MQConstants.TAG_UN_COLLECT)) { // 取消收藏笔记
|
||||||
|
handleUnCollectNoteTagMessage(bodyJsonStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理取消收藏笔记的 MQ 消息
|
||||||
|
*
|
||||||
|
* @param bodyJsonStr 消息体
|
||||||
|
*/
|
||||||
|
private void handleUnCollectNoteTagMessage(String bodyJsonStr) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理收藏笔记的 MQ 消息
|
||||||
|
*
|
||||||
|
* @param bodyJsonStr 消息体
|
||||||
|
*/
|
||||||
|
private void handleCollectNoteTagMessage(String bodyJsonStr) {
|
||||||
|
// 消息体 JSON 字符串转 DTO
|
||||||
|
CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class);
|
||||||
|
|
||||||
|
if (Objects.isNull(collectUnCollectNoteMqDTO)) return;
|
||||||
|
|
||||||
|
// 用户ID
|
||||||
|
Long userId = collectUnCollectNoteMqDTO.getUserId();
|
||||||
|
// 收藏的笔记ID
|
||||||
|
Long noteId = collectUnCollectNoteMqDTO.getNoteId();
|
||||||
|
// 操作类型
|
||||||
|
Integer type = collectUnCollectNoteMqDTO.getType();
|
||||||
|
// 收藏时间
|
||||||
|
LocalDateTime createTime = collectUnCollectNoteMqDTO.getCreateTime();
|
||||||
|
|
||||||
|
// 构建 DO 对象
|
||||||
|
NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder()
|
||||||
|
.userId(userId)
|
||||||
|
.noteId(noteId)
|
||||||
|
.createTime(createTime)
|
||||||
|
.status(type)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 添加或更新笔记收藏记录
|
||||||
|
boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO);
|
||||||
|
|
||||||
|
// TODO: 发送计数 MQ
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,6 +22,11 @@ public interface MQConstants {
|
|||||||
*/
|
*/
|
||||||
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 收藏、取消收藏共用一个
|
||||||
|
*/
|
||||||
|
String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 点赞标签
|
* 点赞标签
|
||||||
*/
|
*/
|
||||||
@@ -31,4 +36,14 @@ public interface MQConstants {
|
|||||||
* Tag 标签:取消点赞
|
* Tag 标签:取消点赞
|
||||||
*/
|
*/
|
||||||
String TAG_UNLIKE = "Unlike";
|
String TAG_UNLIKE = "Unlike";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tag 标签:收藏
|
||||||
|
*/
|
||||||
|
String TAG_COLLECT = "Collect";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tag 标签:取消收藏
|
||||||
|
*/
|
||||||
|
String TAG_UN_COLLECT = "UnCollect";
|
||||||
}
|
}
|
||||||
@@ -6,4 +6,11 @@ import org.apache.ibatis.annotations.Mapper;
|
|||||||
|
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
||||||
|
/**
|
||||||
|
* 新增笔记收藏记录,若已存在,则更新笔记收藏记录
|
||||||
|
*
|
||||||
|
* @param noteCollectionDO 笔记收藏记录
|
||||||
|
* @return 是否成功
|
||||||
|
*/
|
||||||
|
boolean insertOrUpdate(NoteCollectionDO noteCollectionDO);
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package com.hanserwei.hannote.note.biz.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum CollectUnCollectNoteTypeEnum {
|
||||||
|
// 收藏
|
||||||
|
COLLECT(1),
|
||||||
|
// 取消收藏
|
||||||
|
UN_COLLECT(0),
|
||||||
|
;
|
||||||
|
|
||||||
|
private final Integer code;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package com.hanserwei.hannote.note.biz.model.dto;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class CollectUnCollectNoteMqDTO {
|
||||||
|
|
||||||
|
private Long userId;
|
||||||
|
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 0: 取消收藏, 1:收藏
|
||||||
|
*/
|
||||||
|
private Integer type;
|
||||||
|
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
}
|
||||||
@@ -22,6 +22,7 @@ import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
|||||||
import com.hanserwei.hannote.note.biz.domain.dataobject.TopicDO;
|
import com.hanserwei.hannote.note.biz.domain.dataobject.TopicDO;
|
||||||
import com.hanserwei.hannote.note.biz.domain.mapper.NoteDOMapper;
|
import com.hanserwei.hannote.note.biz.domain.mapper.NoteDOMapper;
|
||||||
import com.hanserwei.hannote.note.biz.enums.*;
|
import com.hanserwei.hannote.note.biz.enums.*;
|
||||||
|
import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO;
|
||||||
import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO;
|
import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO;
|
||||||
import com.hanserwei.hannote.note.biz.model.vo.*;
|
import com.hanserwei.hannote.note.biz.model.vo.*;
|
||||||
import com.hanserwei.hannote.note.biz.rpc.DistributedIdGeneratorRpcService;
|
import com.hanserwei.hannote.note.biz.rpc.DistributedIdGeneratorRpcService;
|
||||||
@@ -924,7 +925,36 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// TODO: 4. 发送 MQ, 将收藏数据落库
|
// 4. 发送 MQ, 将收藏数据落库
|
||||||
|
// 构建消息体 DTO
|
||||||
|
CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = CollectUnCollectNoteMqDTO.builder()
|
||||||
|
.userId(userId)
|
||||||
|
.noteId(noteId)
|
||||||
|
.type(CollectUnCollectNoteTypeEnum.COLLECT.getCode()) // 收藏笔记
|
||||||
|
.createTime(now)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
|
||||||
|
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(collectUnCollectNoteMqDTO))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 通过冒号连接, 可让 MQ 发送给主题 Topic 时,携带上标签 Tag
|
||||||
|
String destination = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT + ":" + MQConstants.TAG_COLLECT;
|
||||||
|
|
||||||
|
String hashKey = String.valueOf(userId);
|
||||||
|
|
||||||
|
// 异步发送顺序 MQ 消息,提升接口响应速度
|
||||||
|
rocketMQTemplate.asyncSendOrderly(destination, message, hashKey, new SendCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult sendResult) {
|
||||||
|
log.info("==> 【笔记收藏】MQ 发送成功,SendResult: {}", sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Throwable throwable) {
|
||||||
|
log.error("==> 【笔记收藏】MQ 发送异常: ", throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
return Response.success();
|
return Response.success();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,4 +14,11 @@
|
|||||||
<!--@mbg.generated-->
|
<!--@mbg.generated-->
|
||||||
id, user_id, note_id, create_time, `status`
|
id, user_id, note_id, create_time, `status`
|
||||||
</sql>
|
</sql>
|
||||||
|
|
||||||
|
<insert id="insertOrUpdate" parameterType="com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO">
|
||||||
|
INSERT INTO t_note_collection (user_id, note_id, create_time, status)
|
||||||
|
VALUES (#{userId}, #{noteId}, #{createTime}, #{status})
|
||||||
|
ON DUPLICATE KEY UPDATE
|
||||||
|
create_time = #{createTime}, status = #{status};
|
||||||
|
</insert>
|
||||||
</mapper>
|
</mapper>
|
||||||
Reference in New Issue
Block a user