From 63495b49384ed4d757481cdcbb320203fd744853 Mon Sep 17 00:00:00 2001 From: Hanserwei <2628273921@qq.com> Date: Fri, 7 Nov 2025 17:13:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(count):=20=E5=AE=9E=E7=8E=B0=E8=AF=84?= =?UTF-8?q?=E8=AE=BA=E5=8F=91=E5=B8=83=E5=90=8E=E5=BC=82=E6=AD=A5=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E7=AC=94=E8=AE=B0=E8=AF=84=E8=AE=BA=E6=95=B0=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=20-=20=E6=96=B0=E5=A2=9E=20CountPublishCommentMqDTO?= =?UTF-8?q?=20=E7=94=A8=E4=BA=8E=E4=BC=A0=E8=BE=93=E8=AF=84=E8=AE=BA?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E6=B6=88=E6=81=AF=20-=20=E5=9C=A8=E8=AF=84?= =?UTF-8?q?=E8=AE=BA=E6=9C=8D=E5=8A=A1=E4=B8=AD=E6=B7=BB=E5=8A=A0=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E5=8F=91=E9=80=81=E8=AF=84=E8=AE=BA=E8=AE=A1=E6=95=B0?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=80=BB=E8=BE=91=20-=20=E6=96=B0=E5=BB=BA?= =?UTF-8?q?=20CountNoteCommentConsumer=20=E6=B6=88=E8=B4=B9=E8=AF=84?= =?UTF-8?q?=E8=AE=BA=E8=AE=A1=E6=95=B0=E6=B6=88=E6=81=AF=E5=B9=B6=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E6=9B=B4=E6=96=B0=E7=AC=94=E8=AE=B0=E8=AF=84=E8=AE=BA?= =?UTF-8?q?=E6=95=B0=20-=20=E6=89=A9=E5=B1=95=20t=5Fcomment=20=E8=A1=A8?= =?UTF-8?q?=E7=BB=93=E6=9E=84=EF=BC=8C=E6=96=B0=E5=A2=9E=20child=5Fcomment?= =?UTF-8?q?=5Ftotal=20=E5=AD=97=E6=AE=B5=20-=20=E6=9B=B4=E6=96=B0=20MQ=20?= =?UTF-8?q?=E5=B8=B8=E9=87=8F=E9=85=8D=E7=BD=AE=EF=BC=8C=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E8=AF=84=E8=AE=BA=E8=AE=A1=E6=95=B0=E7=9B=B8=E5=85=B3=20Topic?= =?UTF-8?q?=20=E5=AE=9A=E4=B9=89=20-=20=E8=B0=83=E6=95=B4=20LIKE/UNLIKE=20?= =?UTF-8?q?=E5=92=8C=20COLLECT/UNCOLLECT=20=E6=B6=88=E8=B4=B9=E8=80=85?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E6=B3=A8=E8=A7=A3=E4=BD=BF=E7=94=A8=EF=BC=88?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E5=BE=AA=E7=8E=AF=E4=BE=9D=E8=B5=96=EF=BC=89?= =?UTF-8?q?=20-=20=E4=BF=AE=E6=94=B9=20gateApi.http=20=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B=E5=86=85=E5=AE=B9=E4=BB=A5?= =?UTF-8?q?=E9=80=82=E9=85=8D=E6=96=B0=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../comment/biz/constants/MQConstants.java | 5 ++ .../biz/consumer/Comment2DBConsumer.java | 41 +++++++++- .../model/dto/CountPublishCommentMqDTO.java | 24 ++++++ .../count/biz/constant/MQConstants.java | 15 ++-- .../consumer/CountNoteCommentConsumer.java | 76 +++++++++++++++++++ .../biz/domain/mapper/NoteCountDOMapper.java | 9 +++ .../model/dto/CountPublishCommentMqDTO.java | 24 ++++++ .../resources/mapperxml/NoteCountDOMapper.xml | 6 ++ .../CollectUnCollectNoteConsumer.java | 2 +- .../biz/comsumer/LikeUnlikeNoteConsumer.java | 2 +- http-client/gateApi.http | 5 +- sql/createTable.sql | 5 ++ 12 files changed, 196 insertions(+), 18 deletions(-) create mode 100644 han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/dto/CountPublishCommentMqDTO.java create mode 100644 han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCommentConsumer.java create mode 100644 han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountPublishCommentMqDTO.java diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java index 6b12e6d..cd39c26 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/constants/MQConstants.java @@ -7,4 +7,9 @@ public interface MQConstants { */ String TOPIC_PUBLISH_COMMENT = "PublishCommentTopic"; + /** + * Topic: 笔记评论总数计数 + */ + String TOPIC_COUNT_NOTE_COMMENT = "CountNoteCommentTopic"; + } \ No newline at end of file diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java index 6b9052e..2880692 100644 --- a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/consumer/Comment2DBConsumer.java @@ -10,6 +10,7 @@ import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO; import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper; import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum; import com.hanserwei.hannote.comment.biz.model.bo.CommentBO; +import com.hanserwei.hannote.comment.biz.model.dto.CountPublishCommentMqDTO; import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO; import com.hanserwei.hannote.comment.biz.rpc.KeyValueRpcService; import jakarta.annotation.PreDestroy; @@ -20,11 +21,15 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionTemplate; @@ -48,6 +53,8 @@ public class Comment2DBConsumer { private TransactionTemplate transactionTemplate; @Resource private KeyValueRpcService keyValueRpcService; + @Resource + private RocketMQTemplate rocketMQTemplate; private DefaultMQPushConsumer consumer; @@ -165,10 +172,10 @@ public class Comment2DBConsumer { log.info("## 清洗后的 CommentBOS: {}", JsonUtils.toJsonString(commentBOS)); // 编程式事务,保证整体操作的原子性 - transactionTemplate.execute(status -> { + Integer insertedRows = transactionTemplate.execute(status -> { try { // 先批量存入评论元数据 - commentDOMapper.batchInsert(commentBOS); + int count = commentDOMapper.batchInsert(commentBOS); // 过滤出评论内容不为空的 BO List commentContentNotEmptyBOS = commentBOS.stream() @@ -179,7 +186,7 @@ public class Comment2DBConsumer { keyValueRpcService.batchSaveCommentContent(commentContentNotEmptyBOS); } - return true; + return count; } catch (Exception ex) { status.setRollbackOnly(); // 标记事务为回滚 log.error("", ex); @@ -187,6 +194,34 @@ public class Comment2DBConsumer { } }); + // 如果批量插入的行数大于 0 + if (Objects.nonNull(insertedRows) && insertedRows > 0) { + // 构建发送给计数服务的 DTO 集合 + List countPublishCommentMqDTOS = publishCommentMqDTOS.stream() + .map(publishCommentMqDTO -> CountPublishCommentMqDTO.builder() + .noteId(publishCommentMqDTO.getNoteId()) + .commentId(publishCommentMqDTO.getCommentId()) + .build()) + .toList(); + + // 异步发送计数 MQ + org.springframework.messaging.Message message = MessageBuilder.withPayload(JsonUtils.toJsonString(countPublishCommentMqDTOS)) + .build(); + + // 异步发送 MQ 消息 + rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COMMENT, message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("==> 【计数: 评论发布】MQ 发送成功,SendResult: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + log.error("==> 【计数: 评论发布】MQ 发送异常: ", throwable); + } + }); + } + // 手动 ACK,告诉 RocketMQ 这批次消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { diff --git a/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/dto/CountPublishCommentMqDTO.java b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/dto/CountPublishCommentMqDTO.java new file mode 100644 index 0000000..bab5628 --- /dev/null +++ b/han-note-comment/han-note-comment-biz/src/main/java/com/hanserwei/hannote/comment/biz/model/dto/CountPublishCommentMqDTO.java @@ -0,0 +1,24 @@ +package com.hanserwei.hannote.comment.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CountPublishCommentMqDTO { + + /** + * 笔记 ID + */ + private Long noteId; + + /** + * 评论 ID + */ + private Long commentId; + +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java index 21ce583..82315ca 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/constant/MQConstants.java @@ -2,6 +2,11 @@ package com.hanserwei.hannote.count.biz.constant; public interface MQConstants { + /** + * Topic: 笔记评论总数计数 + */ + String TOPIC_COUNT_NOTE_COMMENT = "CountNoteCommentTopic"; + /** * Topic: 计数 - 笔记点赞数 */ @@ -32,21 +37,11 @@ public interface MQConstants { */ String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic"; - /** - * Topic: 计数 - 笔记点赞数 - */ - String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic"; - /** * Topic: 计数 - 笔记点赞数落库 */ String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic"; - /** - * Topic: 计数 - 笔记收藏数 - */ - String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic"; - /** * Topic: 计数 - 笔记收藏数落库 */ diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCommentConsumer.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCommentConsumer.java new file mode 100644 index 0000000..406fe8d --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/consumer/CountNoteCommentConsumer.java @@ -0,0 +1,76 @@ +package com.hanserwei.hannote.count.biz.consumer; + +import cn.hutool.core.collection.CollUtil; +import com.github.phantomthief.collection.BufferTrigger; +import com.google.common.collect.Lists; +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.count.biz.constant.MQConstants; +import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper; +import com.hanserwei.hannote.count.biz.model.dto.CountPublishCommentMqDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Component +@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COMMENT, // Group 组 + topic = MQConstants.TOPIC_COUNT_NOTE_COMMENT // 主题 Topic +) +@Slf4j +public class CountNoteCommentConsumer implements RocketMQListener { + + @Resource + private NoteCountDOMapper noteCountDOMapper; + + private final BufferTrigger bufferTrigger = BufferTrigger.batchBlocking() + .bufferSize(50000) // 缓存队列的最大容量 + .batchSize(1000) // 一批次最多聚合 1000 条 + .linger(Duration.ofSeconds(1)) // 多久聚合一次(1s 一次) + .setConsumerEx(this::consumeMessage) // 设置消费者方法 + .build(); + + @Override + public void onMessage(String body) { + // 往 bufferTrigger 中添加元素 + bufferTrigger.enqueue(body); + } + + private void consumeMessage(List bodys) { + log.info("==> 【笔记评论数】聚合消息, size: {}", bodys.size()); + log.info("==> 【笔记评论数】聚合消息, {}", JsonUtils.toJsonString(bodys)); + + // 将聚合后的消息体 Json 转 List + List countPublishCommentMqDTOList = Lists.newArrayList(); + bodys.forEach(body -> { + try { + List list = JsonUtils.parseList(body, CountPublishCommentMqDTO.class); + countPublishCommentMqDTOList.addAll(list); + } catch (Exception e) { + log.error("", e); + } + }); + + // 按笔记 ID 进行分组 + Map> groupMap = countPublishCommentMqDTOList.stream() + .collect(Collectors.groupingBy(CountPublishCommentMqDTO::getNoteId)); + + // 循环分组字典 + for (Map.Entry> entry : groupMap.entrySet()) { + // 笔记 ID + Long noteId = entry.getKey(); + // 评论数 + int count = CollUtil.size(entry.getValue()); + + // 若评论数大于零,则执行更新操作:累加评论总数 + if (count > 0) { + noteCountDOMapper.insertOrUpdateCommentTotalByNoteId(count, noteId); + } + } + } +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java index a5bee9e..00b7b12 100644 --- a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/domain/mapper/NoteCountDOMapper.java @@ -25,4 +25,13 @@ public interface NoteCountDOMapper extends BaseMapper { * @return 影响行数 */ int insertOrUpdateCollectTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId); + + /** + * 添加记录或更新笔记评论数 + * + * @param count 评论数 + * @param noteId 笔记ID + * @return 影响行数 + */ + int insertOrUpdateCommentTotalByNoteId(@Param("count") int count, @Param("noteId") Long noteId); } \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountPublishCommentMqDTO.java b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountPublishCommentMqDTO.java new file mode 100644 index 0000000..c79ae7b --- /dev/null +++ b/han-note-count/han-note-count-biz/src/main/java/com/hanserwei/hannote/count/biz/model/dto/CountPublishCommentMqDTO.java @@ -0,0 +1,24 @@ +package com.hanserwei.hannote.count.biz.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class CountPublishCommentMqDTO { + + /** + * 笔记 ID + */ + private Long noteId; + + /** + * 评论 ID + */ + private Long commentId; + +} \ No newline at end of file diff --git a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml index ccae724..df309d2 100644 --- a/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml +++ b/han-note-count/han-note-count-biz/src/main/resources/mapperxml/NoteCountDOMapper.xml @@ -26,4 +26,10 @@ VALUES (#{noteId}, #{count}) ON DUPLICATE KEY UPDATE collect_total = collect_total + (#{count}); + + + INSERT INTO t_note_count (note_id, comment_total) + VALUES (#{noteId}, #{count}) + ON DUPLICATE KEY UPDATE comment_total = comment_total + (#{count}); + \ No newline at end of file diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java index 7efe22b..c7533e4 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/CollectUnCollectNoteConsumer.java @@ -35,7 +35,7 @@ public class CollectUnCollectNoteConsumer { private final RateLimiter rateLimiter = RateLimiter.create(5000); @Value("${rocketmq.name-server}") private String nameServer; - @Resource + private DefaultMQPushConsumer consumer; @Resource private NoteCollectionDOMapper noteCollectionDOMapper; diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java index b8045a0..c7de6c3 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/LikeUnlikeNoteConsumer.java @@ -35,7 +35,7 @@ public class LikeUnlikeNoteConsumer { private final RateLimiter rateLimiter = RateLimiter.create(5000); @Value("${rocketmq.name-server}") private String nameServer; - @Resource + private DefaultMQPushConsumer consumer; @Resource private NoteLikeDOMapper noteLikeDOMapper; diff --git a/http-client/gateApi.http b/http-client/gateApi.http index c5bc373..d91cf32 100644 --- a/http-client/gateApi.http +++ b/http-client/gateApi.http @@ -298,9 +298,8 @@ Authorization: Bearer {{token}} { "noteId": 1862481582414102549, - "content": "这是一条回复测试评论", - "imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg", - "replyCommentId": 2001 + "content": "这是一条测试评论计数的评论111", + "imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg" } ### 批量添加评论 diff --git a/sql/createTable.sql b/sql/createTable.sql index 029ce50..6da6eaa 100644 --- a/sql/createTable.sql +++ b/sql/createTable.sql @@ -280,4 +280,9 @@ CREATE TABLE `t_comment_like` DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT ='评论点赞表'; +-- 表:t_comment表冗余字段 +alter table t_comment + add column `child_comment_total` bigint(20) unsigned DEFAULT '0' COMMENT '二级评论总数(只有一级评论才需要统计)'; + +