diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java new file mode 100644 index 0000000..7cbb117 --- /dev/null +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java @@ -0,0 +1,99 @@ +package com.hanserwei.hannote.note.biz.comsumer; + +import com.google.common.util.concurrent.RateLimiter; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.note.biz.constant.MQConstants; +import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO; +import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper; +import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Objects; + +@SuppressWarnings("UnstableApiUsage") +@Component +@Slf4j +@RocketMQMessageListener( + consumerGroup = "han_note_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, + topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, + consumeMode = ConsumeMode.ORDERLY +) +public class CollectUnCollectNoteConsumer implements RocketMQListener { + + // 每秒创建 5000 个令牌 + private final RateLimiter rateLimiter = RateLimiter.create(5000); + @Resource + private NoteCollectionDOMapper noteCollectionDOMapper; + + @Override + public void onMessage(Message message) { + // 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得 + rateLimiter.acquire(); + + // 幂等性: 通过联合唯一索引保证 + + // 消息体 + String bodyJsonStr = new String(message.getBody()); + // 标签 + String tags = message.getTags(); + + log.info("==> CollectUnCollectNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags); + + // 根据 MQ 标签,判断操作类型 + if (Objects.equals(tags, MQConstants.TAG_COLLECT)) { // 收藏笔记 + handleCollectNoteTagMessage(bodyJsonStr); + } else if (Objects.equals(tags, MQConstants.TAG_UN_COLLECT)) { // 取消收藏笔记 + handleUnCollectNoteTagMessage(bodyJsonStr); + } + } + + /** + * 处理取消收藏笔记的 MQ 消息 + * + * @param bodyJsonStr 消息体 + */ + private void handleUnCollectNoteTagMessage(String bodyJsonStr) { + + } + + /** + * 处理收藏笔记的 MQ 消息 + * + * @param bodyJsonStr 消息体 + */ + private void handleCollectNoteTagMessage(String bodyJsonStr) { + // 消息体 JSON 字符串转 DTO + CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class); + + if (Objects.isNull(collectUnCollectNoteMqDTO)) return; + + // 用户ID + Long userId = collectUnCollectNoteMqDTO.getUserId(); + // 收藏的笔记ID + Long noteId = collectUnCollectNoteMqDTO.getNoteId(); + // 操作类型 + Integer type = collectUnCollectNoteMqDTO.getType(); + // 收藏时间 + LocalDateTime createTime = collectUnCollectNoteMqDTO.getCreateTime(); + + // 构建 DO 对象 + NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder() + .userId(userId) + .noteId(noteId) + .createTime(createTime) + .status(type) + .build(); + + // 添加或更新笔记收藏记录 + boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO); + + // TODO: 发送计数 MQ + } +} diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java index afca2b3..0832f2d 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/constant/MQConstants.java @@ -22,6 +22,11 @@ public interface MQConstants { */ String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; + /** + * Topic: 收藏、取消收藏共用一个 + */ + String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic"; + /** * 点赞标签 */ @@ -31,4 +36,14 @@ public interface MQConstants { * Tag 标签:取消点赞 */ String TAG_UNLIKE = "Unlike"; + + /** + * Tag 标签:收藏 + */ + String TAG_COLLECT = "Collect"; + + /** + * Tag 标签:取消收藏 + */ + String TAG_UN_COLLECT = "UnCollect"; } \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java index c79dfba..f9e3b0a 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/domain/mapper/NoteCollectionDOMapper.java @@ -6,4 +6,11 @@ import org.apache.ibatis.annotations.Mapper; @Mapper public interface NoteCollectionDOMapper extends BaseMapper { + /** + * 新增笔记收藏记录,若已存在,则更新笔记收藏记录 + * + * @param noteCollectionDO 笔记收藏记录 + * @return 是否成功 + */ + boolean insertOrUpdate(NoteCollectionDO noteCollectionDO); } \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/enums/CollectUnCollectNoteTypeEnum.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/enums/CollectUnCollectNoteTypeEnum.java new file mode 100644 index 0000000..5c82755 --- /dev/null +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/enums/CollectUnCollectNoteTypeEnum.java @@ -0,0 +1,17 @@ +package com.hanserwei.hannote.note.biz.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum CollectUnCollectNoteTypeEnum { + // 收藏 + COLLECT(1), + // 取消收藏 + UN_COLLECT(0), + ; + + private final Integer code; + +} \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/model/dto/CollectUnCollectNoteMqDTO.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/model/dto/CollectUnCollectNoteMqDTO.java new file mode 100644 index 0000000..c0ff6bd --- /dev/null +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/model/dto/CollectUnCollectNoteMqDTO.java @@ -0,0 +1,26 @@ +package com.hanserwei.hannote.note.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CollectUnCollectNoteMqDTO { + + private Long userId; + + private Long noteId; + + /** + * 0: 取消收藏, 1:收藏 + */ + private Integer type; + + private LocalDateTime createTime; +} diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java index c5ae350..7b22e1a 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/service/impl/NoteServiceImpl.java @@ -22,6 +22,7 @@ import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO; import com.hanserwei.hannote.note.biz.domain.dataobject.TopicDO; import com.hanserwei.hannote.note.biz.domain.mapper.NoteDOMapper; import com.hanserwei.hannote.note.biz.enums.*; +import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO; import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO; import com.hanserwei.hannote.note.biz.model.vo.*; import com.hanserwei.hannote.note.biz.rpc.DistributedIdGeneratorRpcService; @@ -924,7 +925,36 @@ public class NoteServiceImpl extends ServiceImpl implement } - // TODO: 4. 发送 MQ, 将收藏数据落库 + // 4. 发送 MQ, 将收藏数据落库 + // 构建消息体 DTO + CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = CollectUnCollectNoteMqDTO.builder() + .userId(userId) + .noteId(noteId) + .type(CollectUnCollectNoteTypeEnum.COLLECT.getCode()) // 收藏笔记 + .createTime(now) + .build(); + + // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中 + Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(collectUnCollectNoteMqDTO)) + .build(); + + // 通过冒号连接, 可让 MQ 发送给主题 Topic 时,携带上标签 Tag + String destination = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT + ":" + MQConstants.TAG_COLLECT; + + String hashKey = String.valueOf(userId); + + // 异步发送顺序 MQ 消息,提升接口响应速度 + rocketMQTemplate.asyncSendOrderly(destination, message, hashKey, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【笔记收藏】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【笔记收藏】MQ 发送异常: ", throwable); + } + }); return Response.success(); } diff --git a/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml b/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml index 3d19e02..886f334 100644 --- a/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml +++ b/han-note-note/han-note-note-biz/src/main/resources/mapperxml/NoteCollectionDOMapper.xml @@ -14,4 +14,11 @@ id, user_id, note_id, create_time, `status` + + + INSERT INTO t_note_collection (user_id, note_id, create_time, status) + VALUES (#{userId}, #{noteId}, #{createTime}, #{status}) + ON DUPLICATE KEY UPDATE + create_time = #{createTime}, status = #{status}; + \ No newline at end of file