feat(count): 实现评论发布后异步更新笔记评论数功能
- 新增 CountPublishCommentMqDTO 用于传输评论计数消息 - 在评论服务中添加异步发送评论计数消息逻辑 - 新建 CountNoteCommentConsumer 消费评论计数消息并批量更新笔记评论数 - 扩展 t_comment 表结构,新增 child_comment_total 字段 - 更新 MQ 常量配置,添加评论计数相关 Topic 定义 - 调整 LIKE/UNLIKE 和 COLLECT/UNCOLLECT 消费者中的注解使用(防止循环依赖) - 修改 gateApi.http 中的测试用例内容以适配新功能
This commit is contained in:
@@ -7,4 +7,9 @@ public interface MQConstants {
|
||||
*/
|
||||
String TOPIC_PUBLISH_COMMENT = "PublishCommentTopic";
|
||||
|
||||
/**
|
||||
* Topic: 笔记评论总数计数
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_COMMENT = "CountNoteCommentTopic";
|
||||
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
|
||||
import com.hanserwei.hannote.comment.biz.model.bo.CommentBO;
|
||||
import com.hanserwei.hannote.comment.biz.model.dto.CountPublishCommentMqDTO;
|
||||
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
|
||||
import com.hanserwei.hannote.comment.biz.rpc.KeyValueRpcService;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@@ -20,11 +21,15 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
@@ -48,6 +53,8 @@ public class Comment2DBConsumer {
|
||||
private TransactionTemplate transactionTemplate;
|
||||
@Resource
|
||||
private KeyValueRpcService keyValueRpcService;
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
@@ -165,10 +172,10 @@ public class Comment2DBConsumer {
|
||||
|
||||
log.info("## 清洗后的 CommentBOS: {}", JsonUtils.toJsonString(commentBOS));
|
||||
// 编程式事务,保证整体操作的原子性
|
||||
transactionTemplate.execute(status -> {
|
||||
Integer insertedRows = transactionTemplate.execute(status -> {
|
||||
try {
|
||||
// 先批量存入评论元数据
|
||||
commentDOMapper.batchInsert(commentBOS);
|
||||
int count = commentDOMapper.batchInsert(commentBOS);
|
||||
|
||||
// 过滤出评论内容不为空的 BO
|
||||
List<CommentBO> commentContentNotEmptyBOS = commentBOS.stream()
|
||||
@@ -179,7 +186,7 @@ public class Comment2DBConsumer {
|
||||
keyValueRpcService.batchSaveCommentContent(commentContentNotEmptyBOS);
|
||||
}
|
||||
|
||||
return true;
|
||||
return count;
|
||||
} catch (Exception ex) {
|
||||
status.setRollbackOnly(); // 标记事务为回滚
|
||||
log.error("", ex);
|
||||
@@ -187,6 +194,34 @@ public class Comment2DBConsumer {
|
||||
}
|
||||
});
|
||||
|
||||
// 如果批量插入的行数大于 0
|
||||
if (Objects.nonNull(insertedRows) && insertedRows > 0) {
|
||||
// 构建发送给计数服务的 DTO 集合
|
||||
List<CountPublishCommentMqDTO> countPublishCommentMqDTOS = publishCommentMqDTOS.stream()
|
||||
.map(publishCommentMqDTO -> CountPublishCommentMqDTO.builder()
|
||||
.noteId(publishCommentMqDTO.getNoteId())
|
||||
.commentId(publishCommentMqDTO.getCommentId())
|
||||
.build())
|
||||
.toList();
|
||||
|
||||
// 异步发送计数 MQ
|
||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countPublishCommentMqDTOS))
|
||||
.build();
|
||||
|
||||
// 异步发送 MQ 消息
|
||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COMMENT, message, new SendCallback() {
|
||||
@Override
|
||||
public void onSuccess(SendResult sendResult) {
|
||||
log.info("==> 【计数: 评论发布】MQ 发送成功,SendResult: {}", sendResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("==> 【计数: 评论发布】MQ 发送异常: ", throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.hanserwei.hannote.comment.biz.model.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class CountPublishCommentMqDTO {
|
||||
|
||||
/**
|
||||
* 笔记 ID
|
||||
*/
|
||||
private Long noteId;
|
||||
|
||||
/**
|
||||
* 评论 ID
|
||||
*/
|
||||
private Long commentId;
|
||||
|
||||
}
|
||||
@@ -2,6 +2,11 @@ package com.hanserwei.hannote.count.biz.constant;
|
||||
|
||||
public interface MQConstants {
|
||||
|
||||
/**
|
||||
* Topic: 笔记评论总数计数
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_COMMENT = "CountNoteCommentTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记点赞数
|
||||
*/
|
||||
@@ -32,21 +37,11 @@ public interface MQConstants {
|
||||
*/
|
||||
String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记点赞数
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记点赞数落库
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记收藏数
|
||||
*/
|
||||
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
|
||||
|
||||
/**
|
||||
* Topic: 计数 - 笔记收藏数落库
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
package com.hanserwei.hannote.count.biz.consumer;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.github.phantomthief.collection.BufferTrigger;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
||||
import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper;
|
||||
import com.hanserwei.hannote.count.biz.model.dto.CountPublishCommentMqDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COMMENT, // Group 组
|
||||
topic = MQConstants.TOPIC_COUNT_NOTE_COMMENT // 主题 Topic
|
||||
)
|
||||
@Slf4j
|
||||
public class CountNoteCommentConsumer implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private NoteCountDOMapper noteCountDOMapper;
|
||||
|
||||
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
|
||||
.bufferSize(50000) // 缓存队列的最大容量
|
||||
.batchSize(1000) // 一批次最多聚合 1000 条
|
||||
.linger(Duration.ofSeconds(1)) // 多久聚合一次(1s 一次)
|
||||
.setConsumerEx(this::consumeMessage) // 设置消费者方法
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
// 往 bufferTrigger 中添加元素
|
||||
bufferTrigger.enqueue(body);
|
||||
}
|
||||
|
||||
private void consumeMessage(List<String> bodys) {
|
||||
log.info("==> 【笔记评论数】聚合消息, size: {}", bodys.size());
|
||||
log.info("==> 【笔记评论数】聚合消息, {}", JsonUtils.toJsonString(bodys));
|
||||
|
||||
// 将聚合后的消息体 Json 转 List<CountPublishCommentMqDTO>
|
||||
List<CountPublishCommentMqDTO> countPublishCommentMqDTOList = Lists.newArrayList();
|
||||
bodys.forEach(body -> {
|
||||
try {
|
||||
List<CountPublishCommentMqDTO> list = JsonUtils.parseList(body, CountPublishCommentMqDTO.class);
|
||||
countPublishCommentMqDTOList.addAll(list);
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
});
|
||||
|
||||
// 按笔记 ID 进行分组
|
||||
Map<Long, List<CountPublishCommentMqDTO>> groupMap = countPublishCommentMqDTOList.stream()
|
||||
.collect(Collectors.groupingBy(CountPublishCommentMqDTO::getNoteId));
|
||||
|
||||
// 循环分组字典
|
||||
for (Map.Entry<Long, List<CountPublishCommentMqDTO>> entry : groupMap.entrySet()) {
|
||||
// 笔记 ID
|
||||
Long noteId = entry.getKey();
|
||||
// 评论数
|
||||
int count = CollUtil.size(entry.getValue());
|
||||
|
||||
// 若评论数大于零,则执行更新操作:累加评论总数
|
||||
if (count > 0) {
|
||||
noteCountDOMapper.insertOrUpdateCommentTotalByNoteId(count, noteId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -25,4 +25,13 @@ public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
|
||||
* @return 影响行数
|
||||
*/
|
||||
int insertOrUpdateCollectTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId);
|
||||
|
||||
/**
|
||||
* 添加记录或更新笔记评论数
|
||||
*
|
||||
* @param count 评论数
|
||||
* @param noteId 笔记ID
|
||||
* @return 影响行数
|
||||
*/
|
||||
int insertOrUpdateCommentTotalByNoteId(@Param("count") int count, @Param("noteId") Long noteId);
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.hanserwei.hannote.count.biz.model.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class CountPublishCommentMqDTO {
|
||||
|
||||
/**
|
||||
* 笔记 ID
|
||||
*/
|
||||
private Long noteId;
|
||||
|
||||
/**
|
||||
* 评论 ID
|
||||
*/
|
||||
private Long commentId;
|
||||
|
||||
}
|
||||
@@ -26,4 +26,10 @@
|
||||
VALUES (#{noteId}, #{count})
|
||||
ON DUPLICATE KEY UPDATE collect_total = collect_total + (#{count});
|
||||
</insert>
|
||||
|
||||
<insert id="insertOrUpdateCommentTotalByNoteId" parameterType="map">
|
||||
INSERT INTO t_note_count (note_id, comment_total)
|
||||
VALUES (#{noteId}, #{count})
|
||||
ON DUPLICATE KEY UPDATE comment_total = comment_total + (#{count});
|
||||
</insert>
|
||||
</mapper>
|
||||
@@ -35,7 +35,7 @@ public class CollectUnCollectNoteConsumer {
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||
@Value("${rocketmq.name-server}")
|
||||
private String nameServer;
|
||||
@Resource
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
@Resource
|
||||
private NoteCollectionDOMapper noteCollectionDOMapper;
|
||||
|
||||
@@ -35,7 +35,7 @@ public class LikeUnlikeNoteConsumer {
|
||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||
@Value("${rocketmq.name-server}")
|
||||
private String nameServer;
|
||||
@Resource
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
@Resource
|
||||
private NoteLikeDOMapper noteLikeDOMapper;
|
||||
|
||||
@@ -298,9 +298,8 @@ Authorization: Bearer {{token}}
|
||||
|
||||
{
|
||||
"noteId": 1862481582414102549,
|
||||
"content": "这是一条回复测试评论",
|
||||
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg",
|
||||
"replyCommentId": 2001
|
||||
"content": "这是一条测试评论计数的评论111",
|
||||
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg"
|
||||
}
|
||||
|
||||
### 批量添加评论
|
||||
|
||||
@@ -280,4 +280,9 @@ CREATE TABLE `t_comment_like`
|
||||
DEFAULT CHARSET = utf8mb4
|
||||
COLLATE = utf8mb4_unicode_ci COMMENT ='评论点赞表';
|
||||
|
||||
-- 表:t_comment表冗余字段
|
||||
alter table t_comment
|
||||
add column `child_comment_total` bigint(20) unsigned DEFAULT '0' COMMENT '二级评论总数(只有一级评论才需要统计)';
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user