Compare commits
3 Commits
a37e76c87c
...
9ec330216f
| Author | SHA1 | Date | |
|---|---|---|---|
| 9ec330216f | |||
| 63495b4938 | |||
| f49d0e6b76 |
16
.idea/MyBatisCodeHelperDatasource.xml
generated
16
.idea/MyBatisCodeHelperDatasource.xml
generated
@@ -7,15 +7,13 @@
|
|||||||
<option name="customizedLombokAnnotation" value="true" />
|
<option name="customizedLombokAnnotation" value="true" />
|
||||||
<option name="customizedLombokValue" value="@lombok.Builder" />
|
<option name="customizedLombokValue" value="@lombok.Builder" />
|
||||||
<option name="deleteByPrimayKeyEnabled" value="false" />
|
<option name="deleteByPrimayKeyEnabled" value="false" />
|
||||||
<option name="generateService" value="true" />
|
|
||||||
<option name="generateServiceInterface" value="true" />
|
|
||||||
<option name="insertMethodEnabled" value="false" />
|
<option name="insertMethodEnabled" value="false" />
|
||||||
<option name="insertSelectiveMethodEnabled" value="false" />
|
<option name="insertSelectiveMethodEnabled" value="false" />
|
||||||
<option name="javaMapperPackage" value="com.hanserwei.hannote.comment.biz.domain.mapper" />
|
<option name="javaMapperPackage" value="com.hanserwei.hannote.count.biz.domain.mapper" />
|
||||||
<option name="javaMapperPath" value="$PROJECT_DIR$/han-note-comment/han-note-comment-biz/src/main/java" />
|
<option name="javaMapperPath" value="$PROJECT_DIR$/han-note-count/han-note-count-biz/src/main/java" />
|
||||||
<option name="javaModelPackage" value="com.hanserwei.hannote.comment.biz.domain.dataobject" />
|
<option name="javaModelPackage" value="com.hanserwei.hannote.count.biz.domain.dataobject" />
|
||||||
<option name="javaModelPath" value="$PROJECT_DIR$/han-note-comment/han-note-comment-biz/src/main/java" />
|
<option name="javaModelPath" value="$PROJECT_DIR$/han-note-count/han-note-count-biz/src/main/java" />
|
||||||
<option name="lastDatabaseCrudChooseModuleName" value="han-note-comment-biz" />
|
<option name="lastDatabaseCrudChooseModuleName" value="han-note-count-biz" />
|
||||||
<option name="lombokAllArgConstructor" value="true" />
|
<option name="lombokAllArgConstructor" value="true" />
|
||||||
<option name="lombokDataAnnotation" value="true" />
|
<option name="lombokDataAnnotation" value="true" />
|
||||||
<option name="lombokNoArgsConstructor" value="true" />
|
<option name="lombokNoArgsConstructor" value="true" />
|
||||||
@@ -146,7 +144,7 @@
|
|||||||
<option name="insertMethodEnabled" value="false" />
|
<option name="insertMethodEnabled" value="false" />
|
||||||
<option name="insertSelectiveMethodEnabled" value="false" />
|
<option name="insertSelectiveMethodEnabled" value="false" />
|
||||||
<option name="javaModelName" value="CommentDO" />
|
<option name="javaModelName" value="CommentDO" />
|
||||||
<option name="moduleName" value="han-note-comment-biz" />
|
<option name="moduleName" value="han-note-count-biz" />
|
||||||
<option name="mybatisplusIdType" value="AUTO" />
|
<option name="mybatisplusIdType" value="AUTO" />
|
||||||
<option name="selectByPrimaryKeyEnabled" value="false" />
|
<option name="selectByPrimaryKeyEnabled" value="false" />
|
||||||
<option name="sequenceColumn" value="" />
|
<option name="sequenceColumn" value="" />
|
||||||
@@ -429,7 +427,7 @@
|
|||||||
<option name="updateByPrimaykeyEnabled" value="false" />
|
<option name="updateByPrimaykeyEnabled" value="false" />
|
||||||
<option name="userMybatisPlus" value="true" />
|
<option name="userMybatisPlus" value="true" />
|
||||||
<option name="xmlMapperPackage" value="mapperxml" />
|
<option name="xmlMapperPackage" value="mapperxml" />
|
||||||
<option name="xmlMapperPath" value="$PROJECT_DIR$/han-note-comment/han-note-comment-biz/src/main/resources" />
|
<option name="xmlMapperPath" value="$PROJECT_DIR$/han-note-count/han-note-count-biz/src/main/resources" />
|
||||||
</ProjectProfile>
|
</ProjectProfile>
|
||||||
</option>
|
</option>
|
||||||
</component>
|
</component>
|
||||||
|
|||||||
@@ -7,4 +7,9 @@ public interface MQConstants {
|
|||||||
*/
|
*/
|
||||||
String TOPIC_PUBLISH_COMMENT = "PublishCommentTopic";
|
String TOPIC_PUBLISH_COMMENT = "PublishCommentTopic";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 笔记评论总数计数
|
||||||
|
*/
|
||||||
|
String TOPIC_COUNT_NOTE_COMMENT = "CountNoteCommentTopic";
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -10,6 +10,7 @@ import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
|
|||||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
||||||
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
|
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
|
||||||
import com.hanserwei.hannote.comment.biz.model.bo.CommentBO;
|
import com.hanserwei.hannote.comment.biz.model.bo.CommentBO;
|
||||||
|
import com.hanserwei.hannote.comment.biz.model.dto.CountPublishCommentMqDTO;
|
||||||
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
|
import com.hanserwei.hannote.comment.biz.model.dto.PublishCommentMqDTO;
|
||||||
import com.hanserwei.hannote.comment.biz.rpc.KeyValueRpcService;
|
import com.hanserwei.hannote.comment.biz.rpc.KeyValueRpcService;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
@@ -20,11 +21,15 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
import org.apache.rocketmq.client.exception.MQClientException;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.SendCallback;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||||
import org.apache.rocketmq.common.message.MessageExt;
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
@@ -48,6 +53,8 @@ public class Comment2DBConsumer {
|
|||||||
private TransactionTemplate transactionTemplate;
|
private TransactionTemplate transactionTemplate;
|
||||||
@Resource
|
@Resource
|
||||||
private KeyValueRpcService keyValueRpcService;
|
private KeyValueRpcService keyValueRpcService;
|
||||||
|
@Resource
|
||||||
|
private RocketMQTemplate rocketMQTemplate;
|
||||||
|
|
||||||
private DefaultMQPushConsumer consumer;
|
private DefaultMQPushConsumer consumer;
|
||||||
|
|
||||||
@@ -165,10 +172,10 @@ public class Comment2DBConsumer {
|
|||||||
|
|
||||||
log.info("## 清洗后的 CommentBOS: {}", JsonUtils.toJsonString(commentBOS));
|
log.info("## 清洗后的 CommentBOS: {}", JsonUtils.toJsonString(commentBOS));
|
||||||
// 编程式事务,保证整体操作的原子性
|
// 编程式事务,保证整体操作的原子性
|
||||||
transactionTemplate.execute(status -> {
|
Integer insertedRows = transactionTemplate.execute(status -> {
|
||||||
try {
|
try {
|
||||||
// 先批量存入评论元数据
|
// 先批量存入评论元数据
|
||||||
commentDOMapper.batchInsert(commentBOS);
|
int count = commentDOMapper.batchInsert(commentBOS);
|
||||||
|
|
||||||
// 过滤出评论内容不为空的 BO
|
// 过滤出评论内容不为空的 BO
|
||||||
List<CommentBO> commentContentNotEmptyBOS = commentBOS.stream()
|
List<CommentBO> commentContentNotEmptyBOS = commentBOS.stream()
|
||||||
@@ -179,7 +186,7 @@ public class Comment2DBConsumer {
|
|||||||
keyValueRpcService.batchSaveCommentContent(commentContentNotEmptyBOS);
|
keyValueRpcService.batchSaveCommentContent(commentContentNotEmptyBOS);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return count;
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
status.setRollbackOnly(); // 标记事务为回滚
|
status.setRollbackOnly(); // 标记事务为回滚
|
||||||
log.error("", ex);
|
log.error("", ex);
|
||||||
@@ -187,6 +194,36 @@ public class Comment2DBConsumer {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 如果批量插入的行数大于 0
|
||||||
|
if (Objects.nonNull(insertedRows) && insertedRows > 0) {
|
||||||
|
// 构建发送给计数服务的 DTO 集合
|
||||||
|
List<CountPublishCommentMqDTO> countPublishCommentMqDTOS = commentBOS.stream()
|
||||||
|
.map(commentBO -> CountPublishCommentMqDTO.builder()
|
||||||
|
.noteId(commentBO.getNoteId())
|
||||||
|
.commentId(commentBO.getId())
|
||||||
|
.level(commentBO.getLevel())
|
||||||
|
.parentId(commentBO.getParentId())
|
||||||
|
.build())
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
// 异步发送计数 MQ
|
||||||
|
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countPublishCommentMqDTOS))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 异步发送 MQ 消息
|
||||||
|
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COMMENT, message, new SendCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult sendResult) {
|
||||||
|
log.info("==> 【计数: 评论发布】MQ 发送成功,SendResult: {}", sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Throwable throwable) {
|
||||||
|
log.error("==> 【计数: 评论发布】MQ 发送异常: ", throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package com.hanserwei.hannote.comment.biz.model.dto;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class CountPublishCommentMqDTO {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记 ID
|
||||||
|
*/
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论 ID
|
||||||
|
*/
|
||||||
|
private Long commentId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论级别
|
||||||
|
*/
|
||||||
|
private Integer level;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 父 ID
|
||||||
|
*/
|
||||||
|
private Long parentId;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -2,6 +2,21 @@ package com.hanserwei.hannote.count.biz.constant;
|
|||||||
|
|
||||||
public interface MQConstants {
|
public interface MQConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 笔记评论总数计数
|
||||||
|
*/
|
||||||
|
String TOPIC_COUNT_NOTE_COMMENT = "CountNoteCommentTopic";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 计数 - 笔记点赞数
|
||||||
|
*/
|
||||||
|
String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 笔记收藏、取消收藏
|
||||||
|
*/
|
||||||
|
String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Topic: 关注数计数
|
* Topic: 关注数计数
|
||||||
*/
|
*/
|
||||||
@@ -22,21 +37,11 @@ public interface MQConstants {
|
|||||||
*/
|
*/
|
||||||
String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic";
|
String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic";
|
||||||
|
|
||||||
/**
|
|
||||||
* Topic: 计数 - 笔记点赞数
|
|
||||||
*/
|
|
||||||
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Topic: 计数 - 笔记点赞数落库
|
* Topic: 计数 - 笔记点赞数落库
|
||||||
*/
|
*/
|
||||||
String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic";
|
String TOPIC_COUNT_NOTE_LIKE_2_DB = "CountNoteLike2DBTTopic";
|
||||||
|
|
||||||
/**
|
|
||||||
* Topic: 计数 - 笔记收藏数
|
|
||||||
*/
|
|
||||||
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Topic: 计数 - 笔记收藏数落库
|
* Topic: 计数 - 笔记收藏数落库
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -0,0 +1,80 @@
|
|||||||
|
package com.hanserwei.hannote.count.biz.consumer;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.github.phantomthief.collection.BufferTrigger;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
|
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.count.biz.domain.mapper.CommentDOMapper;
|
||||||
|
import com.hanserwei.hannote.count.biz.enums.CommentLevelEnum;
|
||||||
|
import com.hanserwei.hannote.count.biz.model.dto.CountPublishCommentMqDTO;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RocketMQMessageListener(consumerGroup = "han_note_group_child_comment_total" + MQConstants.TOPIC_COUNT_NOTE_COMMENT, // Group 组
|
||||||
|
topic = MQConstants.TOPIC_COUNT_NOTE_COMMENT // 主题 Topic
|
||||||
|
)
|
||||||
|
@Slf4j
|
||||||
|
public class CountNoteChildCommentConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CommentDOMapper commentDOMapper;
|
||||||
|
|
||||||
|
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
|
||||||
|
.bufferSize(50000) // 缓存队列的最大容量
|
||||||
|
.batchSize(1000) // 一批次最多聚合 1000 条
|
||||||
|
.linger(Duration.ofSeconds(1)) // 多久聚合一次(1s 一次)
|
||||||
|
.setConsumerEx(this::consumeMessage) // 设置消费者方法
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String body) {
|
||||||
|
// 往 bufferTrigger 中添加元素
|
||||||
|
bufferTrigger.enqueue(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void consumeMessage(List<String> bodys) {
|
||||||
|
log.info("==> 【笔记二级评论数】聚合消息, size: {}", bodys.size());
|
||||||
|
log.info("==> 【笔记二级评论数】聚合消息, {}", JsonUtils.toJsonString(bodys));
|
||||||
|
|
||||||
|
// 将聚合后的消息体 Json 转 List<CountPublishCommentMqDTO>
|
||||||
|
List<CountPublishCommentMqDTO> countPublishCommentMqDTOList = Lists.newArrayList();
|
||||||
|
bodys.forEach(body -> {
|
||||||
|
try {
|
||||||
|
List<CountPublishCommentMqDTO> list = JsonUtils.parseList(body, CountPublishCommentMqDTO.class);
|
||||||
|
countPublishCommentMqDTOList.addAll(list);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 过滤出二级评论,并按 parent_id 分组
|
||||||
|
Map<Long, List<CountPublishCommentMqDTO>> groupMap = countPublishCommentMqDTOList.stream()
|
||||||
|
.filter(commentMqDTO -> Objects.equals(CommentLevelEnum.TWO.getCode(), commentMqDTO.getLevel()))
|
||||||
|
.collect(Collectors.groupingBy(CountPublishCommentMqDTO::getParentId)); // 按 parent_id 分组
|
||||||
|
|
||||||
|
// 若无二级评论,则直接 return
|
||||||
|
if (CollUtil.isEmpty(groupMap)) return;
|
||||||
|
|
||||||
|
// 循环分组字典
|
||||||
|
for (Map.Entry<Long, List<CountPublishCommentMqDTO>> entry : groupMap.entrySet()) {
|
||||||
|
// 一级评论 ID
|
||||||
|
Long parentId = entry.getKey();
|
||||||
|
// 评论数
|
||||||
|
int count = CollUtil.size(entry.getValue());
|
||||||
|
|
||||||
|
// 更新一级评论的下级评论总数,进行累加操作
|
||||||
|
commentDOMapper.updateChildCommentTotal(parentId, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -29,8 +29,8 @@ import java.util.stream.Collectors;
|
|||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT,
|
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
||||||
topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT
|
topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT
|
||||||
)
|
)
|
||||||
public class CountNoteCollectConsumer implements RocketMQListener<String> {
|
public class CountNoteCollectConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,76 @@
|
|||||||
|
package com.hanserwei.hannote.count.biz.consumer;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.github.phantomthief.collection.BufferTrigger;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
|
import com.hanserwei.hannote.count.biz.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.count.biz.domain.mapper.NoteCountDOMapper;
|
||||||
|
import com.hanserwei.hannote.count.biz.model.dto.CountPublishCommentMqDTO;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_COMMENT, // Group 组
|
||||||
|
topic = MQConstants.TOPIC_COUNT_NOTE_COMMENT // 主题 Topic
|
||||||
|
)
|
||||||
|
@Slf4j
|
||||||
|
public class CountNoteCommentConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private NoteCountDOMapper noteCountDOMapper;
|
||||||
|
|
||||||
|
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
|
||||||
|
.bufferSize(50000) // 缓存队列的最大容量
|
||||||
|
.batchSize(1000) // 一批次最多聚合 1000 条
|
||||||
|
.linger(Duration.ofSeconds(1)) // 多久聚合一次(1s 一次)
|
||||||
|
.setConsumerEx(this::consumeMessage) // 设置消费者方法
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String body) {
|
||||||
|
// 往 bufferTrigger 中添加元素
|
||||||
|
bufferTrigger.enqueue(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void consumeMessage(List<String> bodys) {
|
||||||
|
log.info("==> 【笔记评论数】聚合消息, size: {}", bodys.size());
|
||||||
|
log.info("==> 【笔记评论数】聚合消息, {}", JsonUtils.toJsonString(bodys));
|
||||||
|
|
||||||
|
// 将聚合后的消息体 Json 转 List<CountPublishCommentMqDTO>
|
||||||
|
List<CountPublishCommentMqDTO> countPublishCommentMqDTOList = Lists.newArrayList();
|
||||||
|
bodys.forEach(body -> {
|
||||||
|
try {
|
||||||
|
List<CountPublishCommentMqDTO> list = JsonUtils.parseList(body, CountPublishCommentMqDTO.class);
|
||||||
|
countPublishCommentMqDTOList.addAll(list);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 按笔记 ID 进行分组
|
||||||
|
Map<Long, List<CountPublishCommentMqDTO>> groupMap = countPublishCommentMqDTOList.stream()
|
||||||
|
.collect(Collectors.groupingBy(CountPublishCommentMqDTO::getNoteId));
|
||||||
|
|
||||||
|
// 循环分组字典
|
||||||
|
for (Map.Entry<Long, List<CountPublishCommentMqDTO>> entry : groupMap.entrySet()) {
|
||||||
|
// 笔记 ID
|
||||||
|
Long noteId = entry.getKey();
|
||||||
|
// 评论数
|
||||||
|
int count = CollUtil.size(entry.getValue());
|
||||||
|
|
||||||
|
// 若评论数大于零,则执行更新操作:累加评论总数
|
||||||
|
if (count > 0) {
|
||||||
|
noteCountDOMapper.insertOrUpdateCommentTotalByNoteId(count, noteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -28,8 +28,8 @@ import java.util.stream.Collectors;
|
|||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_NOTE_LIKE,
|
consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE,
|
||||||
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE
|
topic = MQConstants.TOPIC_LIKE_OR_UNLIKE
|
||||||
)
|
)
|
||||||
public class CountNoteLikeConsumer implements RocketMQListener<String> {
|
public class CountNoteLikeConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,118 @@
|
|||||||
|
package com.hanserwei.hannote.count.biz.domain.dataobject;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论表
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@TableName(value = "t_comment")
|
||||||
|
public class CommentDO {
|
||||||
|
/**
|
||||||
|
* id
|
||||||
|
*/
|
||||||
|
@TableId(value = "id", type = IdType.AUTO)
|
||||||
|
private Long id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关联的笔记ID
|
||||||
|
*/
|
||||||
|
@TableField(value = "note_id")
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发布者用户ID
|
||||||
|
*/
|
||||||
|
@TableField(value = "user_id")
|
||||||
|
private Long userId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论内容UUID
|
||||||
|
*/
|
||||||
|
@TableField(value = "content_uuid")
|
||||||
|
private String contentUuid;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 内容是否为空(0:不为空 1:为空)
|
||||||
|
*/
|
||||||
|
@TableField(value = "is_content_empty")
|
||||||
|
private Boolean isContentEmpty;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论附加图片URL
|
||||||
|
*/
|
||||||
|
@TableField(value = "image_url")
|
||||||
|
private String imageUrl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 级别(1:一级评论 2:二级评论)
|
||||||
|
*/
|
||||||
|
@TableField(value = "`level`")
|
||||||
|
private Integer level;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论被回复次数,仅一级评论需要
|
||||||
|
*/
|
||||||
|
@TableField(value = "reply_total")
|
||||||
|
private Long replyTotal;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论被点赞次数
|
||||||
|
*/
|
||||||
|
@TableField(value = "like_total")
|
||||||
|
private Long likeTotal;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 父ID (若是对笔记的评论,则此字段存储笔记ID; 若是二级评论,则此字段存储一级评论的ID)
|
||||||
|
*/
|
||||||
|
@TableField(value = "parent_id")
|
||||||
|
private Long parentId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 回复哪个的评论 (0表示是对笔记的评论,若是对他人评论的回复,则存储回复评论的ID)
|
||||||
|
*/
|
||||||
|
@TableField(value = "reply_comment_id")
|
||||||
|
private Long replyCommentId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 回复的哪个用户, 存储用户ID
|
||||||
|
*/
|
||||||
|
@TableField(value = "reply_user_id")
|
||||||
|
private Long replyUserId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否置顶(0:不置顶 1:置顶)
|
||||||
|
*/
|
||||||
|
@TableField(value = "is_top")
|
||||||
|
private Boolean isTop;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建时间
|
||||||
|
*/
|
||||||
|
@TableField(value = "create_time")
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新时间
|
||||||
|
*/
|
||||||
|
@TableField(value = "update_time")
|
||||||
|
private LocalDateTime updateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 二级评论总数(只有一级评论才需要统计)
|
||||||
|
*/
|
||||||
|
@TableField(value = "child_comment_total")
|
||||||
|
private Long childCommentTotal;
|
||||||
|
}
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package com.hanserwei.hannote.count.biz.domain.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.hanserwei.hannote.count.biz.domain.dataobject.CommentDO;
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
@Mapper
|
||||||
|
public interface CommentDOMapper extends BaseMapper<CommentDO> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新一级评论的子评论总数
|
||||||
|
*
|
||||||
|
* @param parentId 一级评论 ID
|
||||||
|
* @param count 子评论数
|
||||||
|
* @return 更新结果
|
||||||
|
*/
|
||||||
|
int updateChildCommentTotal(@Param("parentId") Long parentId, @Param("count") int count);
|
||||||
|
}
|
||||||
@@ -25,4 +25,13 @@ public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
|
|||||||
* @return 影响行数
|
* @return 影响行数
|
||||||
*/
|
*/
|
||||||
int insertOrUpdateCollectTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId);
|
int insertOrUpdateCollectTotalByNoteId(@Param("count") Integer count, @Param("noteId") Long noteId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加记录或更新笔记评论数
|
||||||
|
*
|
||||||
|
* @param count 评论数
|
||||||
|
* @param noteId 笔记ID
|
||||||
|
* @return 影响行数
|
||||||
|
*/
|
||||||
|
int insertOrUpdateCommentTotalByNoteId(@Param("count") int count, @Param("noteId") Long noteId);
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package com.hanserwei.hannote.count.biz.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum CommentLevelEnum {
|
||||||
|
// 一级评论
|
||||||
|
ONE(1),
|
||||||
|
// 二级评论
|
||||||
|
TWO(2),
|
||||||
|
;
|
||||||
|
|
||||||
|
private final Integer code;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package com.hanserwei.hannote.count.biz.model.dto;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class CountPublishCommentMqDTO {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记 ID
|
||||||
|
*/
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论 ID
|
||||||
|
*/
|
||||||
|
private Long commentId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 评论级别
|
||||||
|
*/
|
||||||
|
private Integer level;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 父 ID
|
||||||
|
*/
|
||||||
|
private Long parentId;
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.hanserwei.hannote.count.biz.domain.mapper.CommentDOMapper">
|
||||||
|
<resultMap id="BaseResultMap" type="com.hanserwei.hannote.count.biz.domain.dataobject.CommentDO">
|
||||||
|
<!--@mbg.generated-->
|
||||||
|
<!--@Table t_comment-->
|
||||||
|
<id column="id" jdbcType="BIGINT" property="id"/>
|
||||||
|
<result column="note_id" jdbcType="BIGINT" property="noteId"/>
|
||||||
|
<result column="user_id" jdbcType="BIGINT" property="userId"/>
|
||||||
|
<result column="content_uuid" jdbcType="VARCHAR" property="contentUuid"/>
|
||||||
|
<result column="is_content_empty" jdbcType="BIT" property="isContentEmpty"/>
|
||||||
|
<result column="image_url" jdbcType="VARCHAR" property="imageUrl"/>
|
||||||
|
<result column="level" jdbcType="TINYINT" property="level"/>
|
||||||
|
<result column="reply_total" jdbcType="BIGINT" property="replyTotal"/>
|
||||||
|
<result column="like_total" jdbcType="BIGINT" property="likeTotal"/>
|
||||||
|
<result column="parent_id" jdbcType="BIGINT" property="parentId"/>
|
||||||
|
<result column="reply_comment_id" jdbcType="BIGINT" property="replyCommentId"/>
|
||||||
|
<result column="reply_user_id" jdbcType="BIGINT" property="replyUserId"/>
|
||||||
|
<result column="is_top" jdbcType="TINYINT" property="isTop"/>
|
||||||
|
<result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
|
||||||
|
<result column="update_time" jdbcType="TIMESTAMP" property="updateTime"/>
|
||||||
|
<result column="child_comment_total" jdbcType="BIGINT" property="childCommentTotal"/>
|
||||||
|
</resultMap>
|
||||||
|
<sql id="Base_Column_List">
|
||||||
|
<!--@mbg.generated-->
|
||||||
|
id,
|
||||||
|
note_id,
|
||||||
|
user_id,
|
||||||
|
content_uuid,
|
||||||
|
is_content_empty,
|
||||||
|
image_url,
|
||||||
|
`level`,
|
||||||
|
reply_total,
|
||||||
|
like_total,
|
||||||
|
parent_id,
|
||||||
|
reply_comment_id,
|
||||||
|
reply_user_id,
|
||||||
|
is_top,
|
||||||
|
create_time,
|
||||||
|
update_time,
|
||||||
|
child_comment_total
|
||||||
|
</sql>
|
||||||
|
|
||||||
|
<update id="updateChildCommentTotal" parameterType="map">
|
||||||
|
update t_comment
|
||||||
|
set child_comment_total = child_comment_total + #{count},
|
||||||
|
update_time = now()
|
||||||
|
where id = #{parentId}
|
||||||
|
and level = 1
|
||||||
|
</update>
|
||||||
|
</mapper>
|
||||||
@@ -26,4 +26,10 @@
|
|||||||
VALUES (#{noteId}, #{count})
|
VALUES (#{noteId}, #{count})
|
||||||
ON DUPLICATE KEY UPDATE collect_total = collect_total + (#{count});
|
ON DUPLICATE KEY UPDATE collect_total = collect_total + (#{count});
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insertOrUpdateCommentTotalByNoteId" parameterType="map">
|
||||||
|
INSERT INTO t_note_count (note_id, comment_total)
|
||||||
|
VALUES (#{noteId}, #{count})
|
||||||
|
ON DUPLICATE KEY UPDATE comment_total = comment_total + (#{count});
|
||||||
|
</insert>
|
||||||
</mapper>
|
</mapper>
|
||||||
@@ -118,6 +118,13 @@
|
|||||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Rocket MQ 客户端 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,172 +1,154 @@
|
|||||||
package com.hanserwei.hannote.note.biz.comsumer;
|
package com.hanserwei.hannote.note.biz.comsumer;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
||||||
import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper;
|
import com.hanserwei.hannote.note.biz.domain.mapper.NoteCollectionDOMapper;
|
||||||
import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO;
|
import com.hanserwei.hannote.note.biz.model.dto.CollectUnCollectNoteMqDTO;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.rocketmq.client.producer.SendCallback;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||||
import org.apache.rocketmq.common.message.Message;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@SuppressWarnings({"UnstableApiUsage", "DuplicatedCode"})
|
@SuppressWarnings({"UnstableApiUsage"})
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RocketMQMessageListener(
|
public class CollectUnCollectNoteConsumer {
|
||||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
|
||||||
topic = MQConstants.TOPIC_COLLECT_OR_UN_COLLECT,
|
|
||||||
consumeMode = ConsumeMode.ORDERLY
|
|
||||||
)
|
|
||||||
public class CollectUnCollectNoteConsumer implements RocketMQListener<Message> {
|
|
||||||
|
|
||||||
// 每秒创建 5000 个令牌
|
// 每秒创建5000个令牌
|
||||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||||
|
@Value("${rocketmq.name-server}")
|
||||||
|
private String nameServer;
|
||||||
|
|
||||||
|
private DefaultMQPushConsumer consumer;
|
||||||
@Resource
|
@Resource
|
||||||
private NoteCollectionDOMapper noteCollectionDOMapper;
|
private NoteCollectionDOMapper noteCollectionDOMapper;
|
||||||
@Resource
|
|
||||||
private RocketMQTemplate rocketMQTemplate;
|
|
||||||
|
|
||||||
@Override
|
@Bean(name = "CollectUnCollectNoteConsumer")
|
||||||
public void onMessage(Message message) {
|
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||||
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
|
// Group组
|
||||||
|
String group = "han_note_group_" + MQConstants.TOPIC_COLLECT_OR_UN_COLLECT;
|
||||||
|
|
||||||
|
// 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名
|
||||||
|
consumer = new DefaultMQPushConsumer(group);
|
||||||
|
|
||||||
|
// 设置 RocketMQ 的 NameServer 地址
|
||||||
|
consumer.setNamesrvAddr(nameServer);
|
||||||
|
|
||||||
|
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||||
|
consumer.subscribe(MQConstants.TOPIC_COLLECT_OR_UN_COLLECT, "*");
|
||||||
|
|
||||||
|
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||||
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||||
|
|
||||||
|
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||||
|
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||||
|
|
||||||
|
// 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。
|
||||||
|
consumer.setMaxReconsumeTimes(3);
|
||||||
|
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。
|
||||||
|
consumer.setConsumeMessageBatchMaxSize(30);
|
||||||
|
// 设置拉取间隔, 单位毫秒
|
||||||
|
consumer.setPullInterval(1000);
|
||||||
|
|
||||||
|
// 注册消息监听器
|
||||||
|
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||||
|
log.info("==> 【笔记收藏、取消收藏】本批次消息大小: {}", msgs.size());
|
||||||
|
try {
|
||||||
|
// 令牌桶流控, 以控制数据库能够承受的 QPS
|
||||||
rateLimiter.acquire();
|
rateLimiter.acquire();
|
||||||
|
|
||||||
// 幂等性: 通过联合唯一索引保证
|
// 幂等性: 通过联合唯一索引保证
|
||||||
|
|
||||||
// 消息体
|
// 消息体 Json 字符串转 DTO
|
||||||
String bodyJsonStr = new String(message.getBody());
|
List<CollectUnCollectNoteMqDTO> collectUnCollectNoteMqDTOS = Lists.newArrayList();
|
||||||
// 标签
|
msgs.forEach(msg -> {
|
||||||
String tags = message.getTags();
|
String msgJson = new String(msg.getBody());
|
||||||
|
log.info("==> Consumer - Received message: {}", msgJson);
|
||||||
|
collectUnCollectNoteMqDTOS.add(JsonUtils.parseObject(msgJson, CollectUnCollectNoteMqDTO.class));
|
||||||
|
});
|
||||||
|
|
||||||
log.info("==> CollectUnCollectNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags);
|
// 1.内存级操作合并
|
||||||
|
//按用户ID分组
|
||||||
|
Map<Long, List<CollectUnCollectNoteMqDTO>> groupMap = collectUnCollectNoteMqDTOS.stream()
|
||||||
|
.collect(Collectors.groupingBy(CollectUnCollectNoteMqDTO::getUserId));
|
||||||
|
//对每个用户按照用户ID分组并且过滤合并
|
||||||
|
// 对每个用户的操作按 noteId 二次分组,并过滤合并
|
||||||
|
List<CollectUnCollectNoteMqDTO> finalOperations = groupMap.values().stream()
|
||||||
|
.flatMap(userOperations -> {
|
||||||
|
// 按 noteId 分组
|
||||||
|
Map<Long, List<CollectUnCollectNoteMqDTO>> noteGroupMap = userOperations.stream()
|
||||||
|
.collect(Collectors.groupingBy(CollectUnCollectNoteMqDTO::getNoteId));
|
||||||
|
|
||||||
// 根据 MQ 标签,判断操作类型
|
// 处理每个 noteId 的分组
|
||||||
if (Objects.equals(tags, MQConstants.TAG_COLLECT)) { // 收藏笔记
|
// 取最后一次操作(消息是有序的)
|
||||||
handleCollectNoteTagMessage(bodyJsonStr);
|
return noteGroupMap.values().stream()
|
||||||
} else if (Objects.equals(tags, MQConstants.TAG_UN_COLLECT)) { // 取消收藏笔记
|
.filter(operations -> {
|
||||||
handleUnCollectNoteTagMessage(bodyJsonStr);
|
int size = operations.size();
|
||||||
}
|
// 根据奇偶性判断是否需要处理
|
||||||
|
// 偶数次操作:最终状态抵消,无需写入
|
||||||
|
// 奇数次操作:保留最后一次操作
|
||||||
|
return size % 2 != 0;
|
||||||
|
})
|
||||||
|
.map(List::getLast);
|
||||||
|
})
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
// 2. 批量写入数据库
|
||||||
|
if (CollUtil.isNotEmpty(finalOperations)) {
|
||||||
|
// DTO 转 DO
|
||||||
|
List<NoteCollectionDO> noteCollectionDOS = finalOperations.stream()
|
||||||
|
.map(finalOperation -> NoteCollectionDO.builder()
|
||||||
|
.userId(finalOperation.getUserId())
|
||||||
|
.noteId(finalOperation.getNoteId())
|
||||||
|
.createTime(finalOperation.getCreateTime())
|
||||||
|
.status(finalOperation.getType())
|
||||||
|
.build())
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
// 批量写入
|
||||||
|
noteCollectionDOMapper.batchInsertOrUpdate(noteCollectionDOS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理取消收藏笔记的 MQ 消息
|
|
||||||
*
|
|
||||||
* @param bodyJsonStr 消息体
|
|
||||||
*/
|
|
||||||
private void handleUnCollectNoteTagMessage(String bodyJsonStr) {
|
|
||||||
// 消息体 JSON 字符串转 DTO
|
|
||||||
CollectUnCollectNoteMqDTO unCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class);
|
|
||||||
|
|
||||||
if (Objects.isNull(unCollectNoteMqDTO)) return;
|
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||||
|
return ConsumeOrderlyStatus.SUCCESS;
|
||||||
// 用户ID
|
} catch (Exception e) {
|
||||||
Long userId = unCollectNoteMqDTO.getUserId();
|
log.error("", e);
|
||||||
// 收藏的笔记ID
|
// 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试
|
||||||
Long noteId = unCollectNoteMqDTO.getNoteId();
|
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||||
// 操作类型
|
|
||||||
Integer type = unCollectNoteMqDTO.getType();
|
|
||||||
// 收藏时间
|
|
||||||
LocalDateTime createTime = unCollectNoteMqDTO.getCreateTime();
|
|
||||||
|
|
||||||
// 构建 DO 对象
|
|
||||||
NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder()
|
|
||||||
.userId(userId)
|
|
||||||
.noteId(noteId)
|
|
||||||
.createTime(createTime)
|
|
||||||
.status(type)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 取消收藏:记录更新
|
|
||||||
int count = noteCollectionDOMapper.update2UnCollectByUserIdAndNoteId(noteCollectionDO);
|
|
||||||
|
|
||||||
if (count == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新数据库成功后,发送计数 MQ
|
|
||||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 异步发送 MQ 消息
|
|
||||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(SendResult sendResult) {
|
|
||||||
log.info("==> 【计数: 笔记取消收藏】MQ 发送成功,SendResult: {}", sendResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onException(Throwable throwable) {
|
|
||||||
log.error("==> 【计数: 笔记取消收藏】MQ 发送异常: ", throwable);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 启动消费者
|
||||||
|
consumer.start();
|
||||||
|
return consumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@PreDestroy
|
||||||
* 处理收藏笔记的 MQ 消息
|
public void destroy() {
|
||||||
*
|
if (Objects.nonNull(consumer)) {
|
||||||
* @param bodyJsonStr 消息体
|
try {
|
||||||
*/
|
consumer.shutdown(); // 关闭消费者
|
||||||
private void handleCollectNoteTagMessage(String bodyJsonStr) {
|
} catch (Exception e) {
|
||||||
// 消息体 JSON 字符串转 DTO
|
log.error("", e);
|
||||||
CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, CollectUnCollectNoteMqDTO.class);
|
|
||||||
|
|
||||||
if (Objects.isNull(collectUnCollectNoteMqDTO)) return;
|
|
||||||
|
|
||||||
// 用户ID
|
|
||||||
Long userId = collectUnCollectNoteMqDTO.getUserId();
|
|
||||||
// 收藏的笔记ID
|
|
||||||
Long noteId = collectUnCollectNoteMqDTO.getNoteId();
|
|
||||||
// 操作类型
|
|
||||||
Integer type = collectUnCollectNoteMqDTO.getType();
|
|
||||||
// 收藏时间
|
|
||||||
LocalDateTime createTime = collectUnCollectNoteMqDTO.getCreateTime();
|
|
||||||
|
|
||||||
// 构建 DO 对象
|
|
||||||
NoteCollectionDO noteCollectionDO = NoteCollectionDO.builder()
|
|
||||||
.userId(userId)
|
|
||||||
.noteId(noteId)
|
|
||||||
.createTime(createTime)
|
|
||||||
.status(type)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 添加或更新笔记收藏记录
|
|
||||||
boolean isSuccess = noteCollectionDOMapper.insertOrUpdate(noteCollectionDO);
|
|
||||||
|
|
||||||
if (!isSuccess) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送计数 MQ
|
|
||||||
|
|
||||||
// 更新数据库成功后,发送计数 MQ
|
|
||||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 异步发送 MQ 消息
|
|
||||||
// 异步发送 MQ 消息
|
|
||||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_COLLECT, message, new SendCallback() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(SendResult sendResult) {
|
|
||||||
log.info("==> 【计数: 笔记收藏】MQ 发送成功,SendResult: {}", sendResult);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onException(Throwable throwable) {
|
|
||||||
log.error("==> 【计数: 笔记收藏】MQ 发送异常: ", throwable);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,179 +1,154 @@
|
|||||||
package com.hanserwei.hannote.note.biz.comsumer;
|
package com.hanserwei.hannote.note.biz.comsumer;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
import com.hanserwei.hannote.note.biz.constant.MQConstants;
|
||||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
||||||
import com.hanserwei.hannote.note.biz.domain.mapper.NoteLikeDOMapper;
|
import com.hanserwei.hannote.note.biz.domain.mapper.NoteLikeDOMapper;
|
||||||
import com.hanserwei.hannote.note.biz.enums.LikeUnlikeNoteTypeEnum;
|
|
||||||
import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO;
|
import com.hanserwei.hannote.note.biz.model.dto.LikeUnlikeNoteMqDTO;
|
||||||
import com.hanserwei.hannote.note.biz.service.NoteLikeDOService;
|
import jakarta.annotation.PreDestroy;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.rocketmq.client.producer.SendCallback;
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||||
import org.apache.rocketmq.common.message.Message;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@SuppressWarnings({"UnstableApiUsage"})
|
@SuppressWarnings({"UnstableApiUsage"})
|
||||||
@Component
|
@Component
|
||||||
@RocketMQMessageListener(
|
|
||||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE,
|
|
||||||
topic = MQConstants.TOPIC_LIKE_OR_UNLIKE,
|
|
||||||
consumeMode = ConsumeMode.ORDERLY// 顺序消费
|
|
||||||
)
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class LikeUnlikeNoteConsumer implements RocketMQListener<Message> {
|
public class LikeUnlikeNoteConsumer {
|
||||||
|
|
||||||
// 每秒创建 5000 个令牌
|
// 每秒创建5000个令牌
|
||||||
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
private final RateLimiter rateLimiter = RateLimiter.create(5000);
|
||||||
|
@Value("${rocketmq.name-server}")
|
||||||
|
private String nameServer;
|
||||||
|
|
||||||
|
private DefaultMQPushConsumer consumer;
|
||||||
@Resource
|
@Resource
|
||||||
private NoteLikeDOMapper noteLikeDOMapper;
|
private NoteLikeDOMapper noteLikeDOMapper;
|
||||||
@Resource
|
|
||||||
private NoteLikeDOService noteLikeDOService;
|
|
||||||
@Resource
|
|
||||||
private RocketMQTemplate rocketMQTemplate;
|
|
||||||
|
|
||||||
@Override
|
@Bean(name = "LikeUnLikeNoteConsumer")
|
||||||
public void onMessage(Message message) {
|
public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
|
||||||
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
|
// Group组
|
||||||
|
String group = "han_note_group_" + MQConstants.TOPIC_LIKE_OR_UNLIKE;
|
||||||
|
|
||||||
|
// 创建一个新的 DefaultMQPushConsumer 实例,并指定消费者的消费组名
|
||||||
|
consumer = new DefaultMQPushConsumer(group);
|
||||||
|
|
||||||
|
// 设置 RocketMQ 的 NameServer 地址
|
||||||
|
consumer.setNamesrvAddr(nameServer);
|
||||||
|
|
||||||
|
// 订阅指定的主题,并设置主题的订阅规则("*" 表示订阅所有标签的消息)
|
||||||
|
consumer.subscribe(MQConstants.TOPIC_LIKE_OR_UNLIKE, "*");
|
||||||
|
|
||||||
|
// 设置消费者消费消息的起始位置,如果队列中没有消息,则从最新的消息开始消费。
|
||||||
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
|
||||||
|
|
||||||
|
// 设置消息消费模式,这里使用集群模式 (CLUSTERING)
|
||||||
|
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||||||
|
|
||||||
|
// 最大重试次数, 以防消息重试过多次仍然没有成功,避免消息卡在消费队列中。
|
||||||
|
consumer.setMaxReconsumeTimes(3);
|
||||||
|
// 设置每批次消费的最大消息数量,这里设置为 30,表示每次拉取时最多消费 30 条消息。
|
||||||
|
consumer.setConsumeMessageBatchMaxSize(30);
|
||||||
|
// 设置拉取间隔, 单位毫秒
|
||||||
|
consumer.setPullInterval(1000);
|
||||||
|
|
||||||
|
// 注册消息监听器
|
||||||
|
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||||
|
log.info("==> 【笔记点赞、取消点赞】本批次消息大小: {}", msgs.size());
|
||||||
|
try {
|
||||||
|
// 令牌桶流控, 以控制数据库能够承受的 QPS
|
||||||
rateLimiter.acquire();
|
rateLimiter.acquire();
|
||||||
|
|
||||||
// 幂等性,通过联合索引保证
|
// 幂等性: 通过联合唯一索引保证
|
||||||
|
|
||||||
// 消息体
|
// 消息体 Json 字符串转 DTO
|
||||||
String bodyJsonStr = new String(message.getBody());
|
List<LikeUnlikeNoteMqDTO> likeUnlikeNoteMqDTOS = Lists.newArrayList();
|
||||||
// 标签
|
msgs.forEach(msg -> {
|
||||||
String tags = message.getTags();
|
String msgJson = new String(msg.getBody());
|
||||||
|
log.info("==> Consumer - Received message: {}", msgJson);
|
||||||
log.info("==> LikeUnlikeNoteConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags);
|
likeUnlikeNoteMqDTOS.add(JsonUtils.parseObject(msgJson, LikeUnlikeNoteMqDTO.class));
|
||||||
|
|
||||||
// 根据 MQ 标签,判断操作类型
|
|
||||||
if (Objects.equals(tags, MQConstants.TAG_LIKE)) { // 点赞笔记
|
|
||||||
handleLikeNoteTagMessage(bodyJsonStr);
|
|
||||||
} else if (Objects.equals(tags, MQConstants.TAG_UNLIKE)) { // 取消点赞笔记
|
|
||||||
handleUnlikeNoteTagMessage(bodyJsonStr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理取消点赞笔记的 MQ 消息
|
|
||||||
*
|
|
||||||
* @param bodyJsonStr 消息体
|
|
||||||
*/
|
|
||||||
private void handleUnlikeNoteTagMessage(String bodyJsonStr) {
|
|
||||||
// 消息体 JSON 字符串转 DTO
|
|
||||||
LikeUnlikeNoteMqDTO unlikeNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, LikeUnlikeNoteMqDTO.class);
|
|
||||||
if (Objects.isNull(unlikeNoteMqDTO)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 用户ID
|
|
||||||
Long userId = unlikeNoteMqDTO.getUserId();
|
|
||||||
// 点赞的笔记ID
|
|
||||||
Long noteId = unlikeNoteMqDTO.getNoteId();
|
|
||||||
// 操作类型
|
|
||||||
Integer type = unlikeNoteMqDTO.getType();
|
|
||||||
// 取消点赞时间
|
|
||||||
LocalDateTime createTime = unlikeNoteMqDTO.getCreateTime();
|
|
||||||
|
|
||||||
// 设置要更新的字段值
|
|
||||||
NoteLikeDO updateEntity = NoteLikeDO.builder()
|
|
||||||
.createTime(createTime) // 更新时间
|
|
||||||
.status(type) // 设置新的状态值 (例如 0 表示取消点赞)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 设置更新条件:where user_id = [userId] and note_id = [noteId] and status = 1
|
|
||||||
LambdaQueryWrapper<NoteLikeDO> wrapper = new LambdaQueryWrapper<>();
|
|
||||||
wrapper.eq(NoteLikeDO::getUserId, userId)
|
|
||||||
.eq(NoteLikeDO::getNoteId, noteId)
|
|
||||||
.eq(NoteLikeDO::getStatus, LikeUnlikeNoteTypeEnum.LIKE.getCode()); // 确保只更新当前为“已点赞”的记录
|
|
||||||
|
|
||||||
// 执行更新
|
|
||||||
boolean update = noteLikeDOService.update(updateEntity, wrapper);
|
|
||||||
log.info("==> 【取消点赞笔记】更新数据库成功,update: {}", update);
|
|
||||||
|
|
||||||
if (!update) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// 更新数据库成功后,发送计数 MQ
|
|
||||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 异步发送 MQ 消息
|
|
||||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(SendResult sendResult) {
|
|
||||||
log.info("==> 【计数: 笔记取消点赞】MQ 发送成功,SendResult: {}", sendResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onException(Throwable throwable) {
|
|
||||||
log.error("==> 【计数: 笔记取消点赞】MQ 发送异常: ", throwable);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 1.内存级操作合并
|
||||||
|
//按用户ID分组
|
||||||
|
Map<Long, List<LikeUnlikeNoteMqDTO>> groupMap = likeUnlikeNoteMqDTOS.stream()
|
||||||
|
.collect(Collectors.groupingBy(LikeUnlikeNoteMqDTO::getUserId));
|
||||||
|
//对每个用户按照用户ID分组并且过滤合并
|
||||||
|
// 对每个用户的操作按 noteId 二次分组,并过滤合并
|
||||||
|
List<LikeUnlikeNoteMqDTO> finalOperations = groupMap.values().stream()
|
||||||
|
.flatMap(userOperations -> {
|
||||||
|
// 按 noteId 分组
|
||||||
|
Map<Long, List<LikeUnlikeNoteMqDTO>> noteGroupMap = userOperations.stream()
|
||||||
|
.collect(Collectors.groupingBy(LikeUnlikeNoteMqDTO::getNoteId));
|
||||||
|
|
||||||
|
// 处理每个 noteId 的分组
|
||||||
|
// 取最后一次操作(消息是有序的)
|
||||||
|
return noteGroupMap.values().stream()
|
||||||
|
.filter(operations -> {
|
||||||
|
int size = operations.size();
|
||||||
|
// 根据奇偶性判断是否需要处理
|
||||||
|
// 偶数次操作:最终状态抵消,无需写入
|
||||||
|
// 奇数次操作:保留最后一次操作
|
||||||
|
return size % 2 != 0;
|
||||||
|
})
|
||||||
|
.map(List::getLast);
|
||||||
|
})
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
// 2. 批量写入数据库
|
||||||
|
if (CollUtil.isNotEmpty(finalOperations)) {
|
||||||
|
// DTO 转 DO
|
||||||
|
List<NoteLikeDO> noteLikeDOS = finalOperations.stream()
|
||||||
|
.map(finalOperation -> NoteLikeDO.builder()
|
||||||
|
.userId(finalOperation.getUserId())
|
||||||
|
.noteId(finalOperation.getNoteId())
|
||||||
|
.createTime(finalOperation.getCreateTime())
|
||||||
|
.status(finalOperation.getType())
|
||||||
|
.build())
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
// 批量写入
|
||||||
|
noteLikeDOMapper.batchInsertOrUpdate(noteLikeDOS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理点赞笔记的 MQ 消息
|
|
||||||
*
|
|
||||||
* @param bodyJsonStr 消息体
|
|
||||||
*/
|
|
||||||
private void handleLikeNoteTagMessage(String bodyJsonStr) {
|
|
||||||
// 消息体 JSON 字符串转 DTO
|
|
||||||
LikeUnlikeNoteMqDTO likeNoteMqDTO = JsonUtils.parseObject(bodyJsonStr, LikeUnlikeNoteMqDTO.class);
|
|
||||||
|
|
||||||
if (Objects.isNull(likeNoteMqDTO)) return;
|
// 手动 ACK,告诉 RocketMQ 这批次消息消费成功
|
||||||
|
return ConsumeOrderlyStatus.SUCCESS;
|
||||||
// 用户ID
|
} catch (Exception e) {
|
||||||
Long userId = likeNoteMqDTO.getUserId();
|
log.error("", e);
|
||||||
// 点赞的笔记ID
|
// 这样 RocketMQ 会暂停当前队列的消费一段时间,再重试
|
||||||
Long noteId = likeNoteMqDTO.getNoteId();
|
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||||
// 操作类型
|
|
||||||
Integer type = likeNoteMqDTO.getType();
|
|
||||||
// 点赞时间
|
|
||||||
LocalDateTime createTime = likeNoteMqDTO.getCreateTime();
|
|
||||||
|
|
||||||
// 构建 DO 对象
|
|
||||||
NoteLikeDO noteLikeDO = NoteLikeDO.builder()
|
|
||||||
.userId(userId)
|
|
||||||
.noteId(noteId)
|
|
||||||
.createTime(createTime)
|
|
||||||
.status(type)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 添加或更新笔记点赞记录
|
|
||||||
boolean count = noteLikeDOMapper.insertOrUpdate(noteLikeDO);
|
|
||||||
|
|
||||||
if (!count) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 发送计数 MQ
|
|
||||||
// 更新数据库成功后,发送计数 MQ
|
|
||||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(bodyJsonStr)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 异步发送 MQ 消息
|
|
||||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_NOTE_LIKE, message, new SendCallback() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(SendResult sendResult) {
|
|
||||||
log.info("==> 【计数: 笔记点赞】MQ 发送成功,SendResult: {}", sendResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onException(Throwable throwable) {
|
|
||||||
log.error("==> 【计数: 笔记点赞】MQ 发送异常: ", throwable);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// 启动消费者
|
||||||
|
consumer.start();
|
||||||
|
return consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
if (Objects.nonNull(consumer)) {
|
||||||
|
try {
|
||||||
|
consumer.shutdown(); // 关闭消费者
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,21 +17,11 @@ public interface MQConstants {
|
|||||||
*/
|
*/
|
||||||
String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic";
|
String TOPIC_LIKE_OR_UNLIKE = "LikeUnlikeTopic";
|
||||||
|
|
||||||
/**
|
|
||||||
* Topic: 计数 - 笔记点赞数
|
|
||||||
*/
|
|
||||||
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Topic: 收藏、取消收藏共用一个
|
* Topic: 收藏、取消收藏共用一个
|
||||||
*/
|
*/
|
||||||
String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic";
|
String TOPIC_COLLECT_OR_UN_COLLECT = "CollectUnCollectTopic";
|
||||||
|
|
||||||
/**
|
|
||||||
* Topic: 计数 - 笔记收藏数
|
|
||||||
*/
|
|
||||||
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Topic: 笔记操作(发布、删除)
|
* Topic: 笔记操作(发布、删除)
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -3,6 +3,9 @@ package com.hanserwei.hannote.note.biz.domain.mapper;
|
|||||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteCollectionDO;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
||||||
@@ -21,4 +24,12 @@ public interface NoteCollectionDOMapper extends BaseMapper<NoteCollectionDO> {
|
|||||||
* @return 影响行数
|
* @return 影响行数
|
||||||
*/
|
*/
|
||||||
int update2UnCollectByUserIdAndNoteId(NoteCollectionDO noteCollectionDO);
|
int update2UnCollectByUserIdAndNoteId(NoteCollectionDO noteCollectionDO);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量新增笔记收藏记录
|
||||||
|
*
|
||||||
|
* @param noteCollectionDOS 笔记收藏记录
|
||||||
|
* @return 影响行数
|
||||||
|
*/
|
||||||
|
int batchInsertOrUpdate(@Param("noteCollectionDOS") List<NoteCollectionDO> noteCollectionDOS);
|
||||||
}
|
}
|
||||||
@@ -3,6 +3,9 @@ package com.hanserwei.hannote.note.biz.domain.mapper;
|
|||||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
import com.hanserwei.hannote.note.biz.domain.dataobject.NoteLikeDO;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface NoteLikeDOMapper extends BaseMapper<NoteLikeDO> {
|
public interface NoteLikeDOMapper extends BaseMapper<NoteLikeDO> {
|
||||||
@@ -13,4 +16,12 @@ public interface NoteLikeDOMapper extends BaseMapper<NoteLikeDO> {
|
|||||||
* @return 影响行数
|
* @return 影响行数
|
||||||
*/
|
*/
|
||||||
boolean insertOrUpdate(NoteLikeDO noteLikeDO);
|
boolean insertOrUpdate(NoteLikeDO noteLikeDO);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量插入或更新
|
||||||
|
*
|
||||||
|
* @param noteLikeDOS 批量笔记点赞记录
|
||||||
|
* @return 影响行数
|
||||||
|
*/
|
||||||
|
int batchInsertOrUpdate(@Param("noteLikeDOS") List<NoteLikeDO> noteLikeDOS);
|
||||||
}
|
}
|
||||||
@@ -31,4 +31,13 @@
|
|||||||
and note_id = #{noteId}
|
and note_id = #{noteId}
|
||||||
and status = 1
|
and status = 1
|
||||||
</update>
|
</update>
|
||||||
|
|
||||||
|
<insert id="batchInsertOrUpdate" parameterType="list">
|
||||||
|
INSERT INTO t_note_collection (user_id, note_id, status, create_time)
|
||||||
|
VALUES
|
||||||
|
<foreach item="item" collection="noteCollectionDOS" separator=",">
|
||||||
|
(#{item.userId}, #{item.noteId}, #{item.status}, #{item.createTime})
|
||||||
|
</foreach>
|
||||||
|
ON DUPLICATE KEY UPDATE status = VALUES(status)
|
||||||
|
</insert>
|
||||||
</mapper>
|
</mapper>
|
||||||
@@ -21,4 +21,13 @@
|
|||||||
ON DUPLICATE KEY UPDATE
|
ON DUPLICATE KEY UPDATE
|
||||||
create_time = #{createTime}, status = #{status};
|
create_time = #{createTime}, status = #{status};
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="batchInsertOrUpdate" parameterType="list">
|
||||||
|
INSERT INTO t_note_like (user_id, note_id, status, create_time)
|
||||||
|
VALUES
|
||||||
|
<foreach item="item" collection="noteLikeDOS" separator=",">
|
||||||
|
(#{item.userId}, #{item.noteId}, #{item.status}, #{item.createTime})
|
||||||
|
</foreach>
|
||||||
|
ON DUPLICATE KEY UPDATE status = VALUES(status)
|
||||||
|
</insert>
|
||||||
</mapper>
|
</mapper>
|
||||||
@@ -108,6 +108,12 @@
|
|||||||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
|||||||
@@ -298,9 +298,9 @@ Authorization: Bearer {{token}}
|
|||||||
|
|
||||||
{
|
{
|
||||||
"noteId": 1862481582414102549,
|
"noteId": 1862481582414102549,
|
||||||
"content": "这是一条回复测试评论",
|
"content": "这是一条测试评论计数的二级评论333",
|
||||||
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg",
|
"imageUrl": "https://cdn.pixabay.com/photo/2025/10/05/15/06/autumn-9875155_1280.jpg",
|
||||||
"replyCommentId": 2001
|
"replyCommentId": 4002
|
||||||
}
|
}
|
||||||
|
|
||||||
### 批量添加评论
|
### 批量添加评论
|
||||||
|
|||||||
@@ -280,4 +280,9 @@ CREATE TABLE `t_comment_like`
|
|||||||
DEFAULT CHARSET = utf8mb4
|
DEFAULT CHARSET = utf8mb4
|
||||||
COLLATE = utf8mb4_unicode_ci COMMENT ='评论点赞表';
|
COLLATE = utf8mb4_unicode_ci COMMENT ='评论点赞表';
|
||||||
|
|
||||||
|
-- 表:t_comment表冗余字段
|
||||||
|
alter table t_comment
|
||||||
|
add column `child_comment_total` bigint(20) unsigned DEFAULT '0' COMMENT '二级评论总数(只有一级评论才需要统计)';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user