feat(comment): 实现评论异步消费与内容存储

- 新增评论内容批量存储接口与实现
- 实现MQ消息消费端处理评论发布逻辑
- 支持一级与二级评论的层级关系构建
- 添加评论内容与元数据分离存储机制
- 集成分布式ID生成服务用于评论ID生成
- 完善评论相关DTO、DO、BO模型类
- 添加Cassandra数据库操作支持
- 实现Feign接口调用与事务控制
This commit is contained in:
2025-11-05 19:19:19 +08:00
parent c37b16ff42
commit a37e76c87c
22 changed files with 575 additions and 4 deletions

View File

@@ -3,10 +3,12 @@ package com.hanserwei.hannote.comment.biz;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.retry.annotation.EnableRetry;
@SpringBootApplication
@MapperScan("com.hanserwei.hannote.comment.biz.domain.mapper")
@EnableFeignClients("com.hanserwei.hannote")
@EnableRetry
public class HannoteCommentApplication {
public static void main(String[] args) {

View File

@@ -1,9 +1,21 @@
package com.hanserwei.hannote.comment.biz.consumer;
import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
import com.hanserwei.hannote.comment.biz.model.bo.CommentBO;
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
import com.hanserwei.hannote.comment.biz.rpc.KeyValueRpcService;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -14,8 +26,13 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
@SuppressWarnings("UnstableApiUsage")
@Component
@@ -25,6 +42,13 @@ public class Comment2DBConsumer {
@Value("${rocketmq.name-server}")
private String nameServer;
@Resource
private CommentDOMapper commentDOMapper;
@Resource
private TransactionTemplate transactionTemplate;
@Resource
private KeyValueRpcService keyValueRpcService;
private DefaultMQPushConsumer consumer;
// 每秒创建 1000 个令牌
@@ -53,6 +77,9 @@ public class Comment2DBConsumer {
// 设置每批次消费的最大消息数量,这里设置为 30表示每次拉取时最多消费 30 条消息
consumer.setConsumeMessageBatchMaxSize(30);
// 消息体 Json 字符串转 DTO
List<PublishCommentMqDTO> publishCommentMqDTOS = Lists.newArrayList();
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
log.info("==> 本批次消息大小: {}", msgs.size());
@@ -64,8 +91,101 @@ public class Comment2DBConsumer {
String message = new String(msg.getBody());
log.info("==> Consumer - Received message: {}", message);
// TODO: 业务处理
publishCommentMqDTOS.add(JsonUtils.parseObject(message, PublishCommentMqDTO.class));
}
// 提取所有不为空的回复评论 ID
List<Long> replyCommentIds = publishCommentMqDTOS.stream()
.filter(Objects::nonNull)
.map(PublishCommentMqDTO::getReplyCommentId)
.toList();
// 批量查询相关回复评论记录
List<CommentDO> replyCommentDOS = null;
if (CollUtil.isNotEmpty(replyCommentIds)) {
// 批量查询数据库
replyCommentDOS = commentDOMapper.selectByCommentIds(replyCommentIds);
}
// DO 集合转 <评论 ID - 评论 DO> 字典, 以方便后续查找
Map<Long, CommentDO> commentIdAndCommentDOMap = Maps.newHashMap();
if (CollUtil.isNotEmpty(replyCommentDOS)) {
commentIdAndCommentDOMap = replyCommentDOS.stream()
.collect(Collectors.toMap(CommentDO::getId, commentDO -> commentDO));
}
// DTO 转 BO
List<CommentBO> commentBOS = Lists.newArrayList();
for (PublishCommentMqDTO publishCommentMqDTO : publishCommentMqDTOS) {
String imageUrl = publishCommentMqDTO.getImageUrl();
CommentBO commentBO = CommentBO.builder()
.id(publishCommentMqDTO.getCommentId())
.noteId(publishCommentMqDTO.getNoteId())
.userId(publishCommentMqDTO.getCreatorId())
.isContentEmpty(true) // 默认评论内容为空
.imageUrl(StringUtils.isBlank(imageUrl) ? "" : imageUrl)
.level(CommentLevelEnum.ONE.getCode()) // 默认为一级评论
.parentId(publishCommentMqDTO.getNoteId()) // 默认设置为所属笔记 ID
.createTime(publishCommentMqDTO.getCreateTime())
.updateTime(publishCommentMqDTO.getCreateTime())
.isTop(false)
.replyTotal(0L)
.likeTotal(0L)
.replyCommentId(0L)
.replyUserId(0L)
.build();
// 评论内容若不为空
String content = publishCommentMqDTO.getContent();
if (StringUtils.isNotBlank(content)) {
commentBO.setContentUuid(UUID.randomUUID().toString()); // 生成评论内容的 UUID 标识
commentBO.setIsContentEmpty(false);
commentBO.setContent(content);
}
// 设置评论级别、回复用户 ID (reply_user_id)、父评论 ID (parent_id)
Long replyCommentId = publishCommentMqDTO.getReplyCommentId();
if (Objects.nonNull(replyCommentId)) {
CommentDO replyCommentDO = commentIdAndCommentDOMap.get(replyCommentId);
if (Objects.nonNull(replyCommentDO)) {
// 若回复的评论 ID 不为空,说明是二级评论
commentBO.setLevel(CommentLevelEnum.TWO.getCode());
commentBO.setReplyCommentId(publishCommentMqDTO.getReplyCommentId());
// 父评论 ID
commentBO.setParentId(replyCommentDO.getId());
if (Objects.equals(replyCommentDO.getLevel(), CommentLevelEnum.TWO.getCode())) { // 如果回复的评论属于二级评论
commentBO.setParentId(replyCommentDO.getParentId());
}
// 回复的哪个用户
commentBO.setReplyUserId(replyCommentDO.getUserId());
}
}
commentBOS.add(commentBO);
}
log.info("## 清洗后的 CommentBOS: {}", JsonUtils.toJsonString(commentBOS));
// 编程式事务,保证整体操作的原子性
transactionTemplate.execute(status -> {
try {
// 先批量存入评论元数据
commentDOMapper.batchInsert(commentBOS);
// 过滤出评论内容不为空的 BO
List<CommentBO> commentContentNotEmptyBOS = commentBOS.stream()
.filter(commentBO -> Boolean.FALSE.equals(commentBO.getIsContentEmpty()))
.toList();
if (CollUtil.isNotEmpty(commentContentNotEmptyBOS)) {
// 批量存入评论内容
keyValueRpcService.batchSaveCommentContent(commentContentNotEmptyBOS);
}
return true;
} catch (Exception ex) {
status.setRollbackOnly(); // 标记事务为回滚
log.error("", ex);
throw ex;
}
});
// 手动 ACK告诉 RocketMQ 这批次消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

View File

@@ -2,8 +2,28 @@ package com.hanserwei.hannote.comment.biz.domain.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
import com.hanserwei.hannote.comment.biz.model.bo.CommentBO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface CommentDOMapper extends BaseMapper<CommentDO> {
/**
* 根据评论 ID 批量查询
*
* @param commentIds 评论 ID 列表
* @return 评论列表
*/
List<CommentDO> selectByCommentIds(@Param("commentIds") List<Long> commentIds);
/**
* 批量插入评论
*
* @param comments 评论列表
* @return 插入数量
*/
int batchInsert(@Param("comments") List<CommentBO> comments);
}

View File

@@ -0,0 +1,17 @@
package com.hanserwei.hannote.comment.biz.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum CommentLevelEnum {
// 一级评论
ONE(1),
// 二级评论
TWO(2),
;
private final Integer code;
}

View File

@@ -0,0 +1,46 @@
package com.hanserwei.hannote.comment.biz.model.bo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class CommentBO {
private Long id;
private Long noteId;
private Long userId;
private String contentUuid;
private String content;
private Boolean isContentEmpty;
private String imageUrl;
private Integer level;
private Long replyTotal;
private Long likeTotal;
private Long parentId;
private Long replyCommentId;
private Long replyUserId;
private Boolean isTop;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

View File

@@ -40,4 +40,9 @@ public class PublishCommentMqDTO {
*/
private Long creatorId;
/**
* 评论 ID
*/
private Long commentId;
}

View File

@@ -0,0 +1,22 @@
package com.hanserwei.hannote.comment.biz.rpc;
import com.hanserwei.hannote.distributed.id.generator.api.DistributedIdGeneratorFeignApi;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component;
@Component
public class DistributedIdGeneratorRpcService {
@Resource
private DistributedIdGeneratorFeignApi distributedIdGeneratorFeignApi;
/**
* 生成评论 ID
*
* @return 评论 ID
*/
public String generateCommentId() {
return distributedIdGeneratorFeignApi.getSegmentId("leaf-segment-comment-id");
}
}

View File

@@ -0,0 +1,57 @@
package com.hanserwei.hannote.comment.biz.rpc;
import com.google.common.collect.Lists;
import com.hanserwei.framework.common.constant.DateConstants;
import com.hanserwei.framework.common.response.Response;
import com.hanserwei.hannote.comment.biz.model.bo.CommentBO;
import com.hanserwei.hannote.kv.api.KeyValueFeignApi;
import com.hanserwei.hannote.kv.dto.req.BatchAddCommentContentReqDTO;
import com.hanserwei.hannote.kv.dto.req.CommentContentReqDTO;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class KeyValueRpcService {
@Resource
private KeyValueFeignApi keyValueFeignApi;
/**
* 批量存储评论内容
*
* @param commentBOS 评论 BO
* @return 批量保存结果
*/
public boolean batchSaveCommentContent(List<CommentBO> commentBOS) {
List<CommentContentReqDTO> comments = Lists.newArrayList();
// BO 转 DTO
commentBOS.forEach(commentBO -> {
CommentContentReqDTO commentContentReqDTO = CommentContentReqDTO.builder()
.noteId(commentBO.getNoteId())
.content(commentBO.getContent())
.contentId(commentBO.getContentUuid())
.yearMonth(commentBO.getCreateTime().format(DateConstants.DATE_FORMAT_Y_M))
.build();
comments.add(commentContentReqDTO);
});
// 构建接口入参实体类
BatchAddCommentContentReqDTO batchAddCommentContentReqDTO = BatchAddCommentContentReqDTO.builder()
.comments(comments)
.build();
// 调用 KV 存储服务
Response<?> response = keyValueFeignApi.batchAddCommentContent(batchAddCommentContentReqDTO);
// 若返参中 success 为 false, 则主动抛出异常,以便调用层回滚事务
if (!response.isSuccess()) {
throw new RuntimeException("批量保存评论内容失败");
}
return true;
}
}

View File

@@ -11,6 +11,7 @@ import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
import com.hanserwei.hannote.comment.biz.model.vo.PublishCommentReqVO;
import com.hanserwei.hannote.comment.biz.retry.SendMqRetryHelper;
import com.hanserwei.hannote.comment.biz.rpc.DistributedIdGeneratorRpcService;
import com.hanserwei.hannote.comment.biz.service.CommentService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -25,6 +26,8 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
@Resource
private SendMqRetryHelper sendMqRetryHelper;
@Resource
private DistributedIdGeneratorRpcService distributedIdGeneratorRpcService;
@Override
public Response<?> publishComment(PublishCommentReqVO publishCommentReqVO) {
@@ -40,6 +43,9 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
// 发布者ID
Long creatorId = LoginUserContextHolder.getUserId();
// RPC: 调用分布式 ID 生成服务,生成评论 ID
String commentId = distributedIdGeneratorRpcService.generateCommentId();
// 发送消息
// 构造MQ消息体
PublishCommentMqDTO publishCommentMqDTO = PublishCommentMqDTO.builder()
@@ -49,6 +55,7 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
.replyCommentId(publishCommentReqVO.getReplyCommentId())
.createTime(LocalDateTime.now())
.creatorId(creatorId)
.commentId(Long.valueOf(commentId))
.build();
// 发送 MQ 消息,包含重试机制
sendMqRetryHelper.asyncSend(MQConstants.TOPIC_PUBLISH_COMMENT, JsonUtils.toJsonString(publishCommentMqDTO));

View File

@@ -25,4 +25,31 @@
id, note_id, user_id, content_uuid, is_content_empty, image_url, `level`, reply_total,
like_total, parent_id, reply_comment_id, reply_user_id, is_top, create_time, update_time
</sql>
<select id="selectByCommentIds" resultMap="BaseResultMap" parameterType="list">
select id,
level,
parent_id,
user_id
from t_comment
where id in
<foreach collection="commentIds" open="(" separator="," close=")" item="commentId">
#{commentId}
</foreach>
</select>
<insert id="batchInsert" parameterType="list">
insert IGNORE into t_comment (id, note_id, user_id,
content_uuid, is_content_empty, image_url,
`level`, reply_total, like_total,
parent_id, reply_comment_id, reply_user_id,
is_top, create_time, update_time)
values
<foreach collection="comments" item="comment" separator=",">
( #{comment.id}, #{comment.noteId}, #{comment.userId}, #{comment.contentUuid}, #{comment.isContentEmpty}
, #{comment.imageUrl}, #{comment.level}, #{comment.replyTotal}, #{comment.likeTotal}, #{comment.parentId}
, #{comment.replyCommentId}, #{comment.replyUserId}, #{comment.isTop}, #{comment.createTime}
, #{comment.updateTime})
</foreach>
</insert>
</mapper>