feat(comment): 新增一级评论首条回复ID字段及更新机制
- 在 CommentDO 中新增 firstReplyCommentId 字段,用于记录一级评论下最早回复的评论 ID - 在 CommentDOMapper 中新增 selectEarliestByParentId 和 updateFirstReplyCommentIdByPrimaryKey 方法,用于查询和更新一级评论的首条回复 ID - 在 t_comment 表中新增 first_reply_comment_id 字段- 新增 OneLevelCommentFirstReplyCommentIdUpdateConsumer 消费者,用于异步更新一级评论的首条回复 ID- 新增 RedisKeyConstants 常量类,用于构建 Redis Key - 新增 RedisTemplateConfig 配置类,用于配置 RedisTemplate - 在 pom.xml 中新增 spring-boot-starter-data-redis 依赖
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
package com.hanserwei.hannote.comment.biz.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||
|
||||
@Configuration
|
||||
public class RedisTemplateConfig {
|
||||
|
||||
@Bean
|
||||
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
|
||||
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
|
||||
// 设置 RedisTemplate 的连接工厂
|
||||
redisTemplate.setConnectionFactory(connectionFactory);
|
||||
|
||||
// 使用 StringRedisSerializer 来序列化和反序列化 redis 的 key 值,确保 key 是可读的字符串
|
||||
redisTemplate.setKeySerializer(new StringRedisSerializer());
|
||||
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
|
||||
|
||||
// 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 redis 的 value 值, 确保存储的是 JSON 格式
|
||||
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
|
||||
redisTemplate.setValueSerializer(serializer);
|
||||
redisTemplate.setHashValueSerializer(serializer);
|
||||
|
||||
redisTemplate.afterPropertiesSet();
|
||||
return redisTemplate;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.hanserwei.hannote.comment.biz.constants;
|
||||
|
||||
public class RedisKeyConstants {
|
||||
|
||||
/**
|
||||
* Key 前缀:一级评论的 first_reply_comment_id 字段值是否更新标识
|
||||
*/
|
||||
private static final String HAVE_FIRST_REPLY_COMMENT_KEY_PREFIX = "comment:havaFirstReplyCommentId:";
|
||||
|
||||
|
||||
/**
|
||||
* 构建完整 KEY
|
||||
*
|
||||
* @param commentId 一级评论 ID
|
||||
* @return 完整 KEY
|
||||
*/
|
||||
public static String buildHaveFirstReplyCommentKey(Long commentId) {
|
||||
return HAVE_FIRST_REPLY_COMMENT_KEY_PREFIX + commentId;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,164 @@
|
||||
package com.hanserwei.hannote.comment.biz.consumer;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.github.phantomthief.collection.BufferTrigger;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
||||
import com.hanserwei.hannote.comment.biz.constants.RedisKeyConstants;
|
||||
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
|
||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
||||
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
|
||||
import com.hanserwei.hannote.comment.biz.model.dto.CountPublishCommentMqDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@RocketMQMessageListener(consumerGroup = "han_note_group_first_reply_comment_id" + MQConstants.TOPIC_COUNT_NOTE_COMMENT, // Group 组
|
||||
topic = MQConstants.TOPIC_COUNT_NOTE_COMMENT // 主题 Topic
|
||||
)
|
||||
@Slf4j
|
||||
public class OneLevelCommentFirstReplyCommentIdUpdateConsumer implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private CommentDOMapper commentDOMapper;
|
||||
@Resource(name = "taskExecutor")
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
|
||||
.bufferSize(50000) // 缓存队列的最大容量
|
||||
.batchSize(1000) // 一批次最多聚合 1000 条
|
||||
.linger(Duration.ofSeconds(1)) // 多久聚合一次(1s 一次)
|
||||
.setConsumerEx(this::consumeMessage) // 设置消费者方法
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
// 往 bufferTrigger 中添加元素
|
||||
bufferTrigger.enqueue(body);
|
||||
}
|
||||
|
||||
private void consumeMessage(List<String> bodys) {
|
||||
log.info("==> 【一级评论 first_reply_comment_id 更新】聚合消息, size: {}", bodys.size());
|
||||
log.info("==> 【一级评论 first_reply_comment_id 更新】聚合消息, {}", JsonUtils.toJsonString(bodys));
|
||||
|
||||
// 将聚合后的消息体 Json 转 List<CountPublishCommentMqDTO>
|
||||
List<CountPublishCommentMqDTO> publishCommentMqDTOS = Lists.newArrayList();
|
||||
bodys.forEach(body -> {
|
||||
try {
|
||||
List<CountPublishCommentMqDTO> list = JsonUtils.parseList(body, CountPublishCommentMqDTO.class);
|
||||
publishCommentMqDTOS.addAll(list);
|
||||
} catch (Exception e) {
|
||||
log.error("", e);
|
||||
}
|
||||
});
|
||||
|
||||
// 过滤出二级评论的 parent_id(即一级评论 ID),并去重,需要更新对应一级评论的 first_reply_comment_id
|
||||
List<Long> parentIds = publishCommentMqDTOS.stream()
|
||||
.filter(publishCommentMqDTO -> Objects.equals(publishCommentMqDTO.getLevel(), CommentLevelEnum.TWO.getCode()))
|
||||
.map(CountPublishCommentMqDTO::getParentId)
|
||||
.distinct() // 去重
|
||||
.toList();
|
||||
|
||||
if (CollUtil.isEmpty(parentIds)) return;
|
||||
|
||||
// 构建RedisKey
|
||||
List<String> keys = parentIds.stream()
|
||||
.map(RedisKeyConstants::buildHaveFirstReplyCommentKey)
|
||||
.toList();
|
||||
// 批量查询Redis
|
||||
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
|
||||
|
||||
// 提取Redis中不存在的评论ID
|
||||
List<Long> missingCommentIds = Lists.newArrayList();
|
||||
|
||||
if (values != null) {
|
||||
for (int i = 0; i < values.size(); i++) {
|
||||
if (Objects.isNull(values.get(i))) {
|
||||
missingCommentIds.add(parentIds.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 存在的一级评论 ID,说明表中对应记录的 first_reply_comment_id 已经有值
|
||||
if (CollUtil.isNotEmpty(missingCommentIds)) {
|
||||
// 不存在的,则需要进一步查询数据库来确定,是否要更新记录对应的 first_reply_comment_id 值
|
||||
// 批量去数据库中查询
|
||||
List<CommentDO> commentDOS = commentDOMapper.selectByCommentIds(missingCommentIds);
|
||||
|
||||
// 异步将 first_reply_comment_id 不为 0 的一级评论 ID, 同步到 redis 中
|
||||
threadPoolTaskExecutor.submit(() -> {
|
||||
List<Long> needSyncCommentIds = commentDOS.stream()
|
||||
.filter(commentDO -> commentDO.getFirstReplyCommentId() != 0)
|
||||
.map(CommentDO::getId)
|
||||
.toList();
|
||||
|
||||
sync2Redis(needSyncCommentIds);
|
||||
});
|
||||
|
||||
// 过滤出值为 0 的,都需要更新其 first_reply_comment_id
|
||||
List<CommentDO> needUpdateCommentDOS = commentDOS.stream()
|
||||
.filter(commentDO -> commentDO.getFirstReplyCommentId() == 0)
|
||||
.toList();
|
||||
|
||||
needUpdateCommentDOS.forEach(needUpdateCommentDO -> {
|
||||
// 一级评论 ID
|
||||
Long needUpdateCommentId = needUpdateCommentDO.getId();
|
||||
|
||||
// 查询数据库,拿到一级评论最早回复的那条评论
|
||||
CommentDO earliestCommentDO = commentDOMapper.selectEarliestByParentId(needUpdateCommentId);
|
||||
|
||||
if (Objects.nonNull(earliestCommentDO)) {
|
||||
// 最早回复的那条评论 ID
|
||||
Long earliestCommentId = earliestCommentDO.getId();
|
||||
|
||||
// 更新其一级评论的 first_reply_comment_id
|
||||
commentDOMapper.updateFirstReplyCommentIdByPrimaryKey(earliestCommentId, needUpdateCommentId);
|
||||
|
||||
// 异步同步到 Redis
|
||||
threadPoolTaskExecutor.submit(() -> sync2Redis(Lists.newArrayList(needUpdateCommentId)));
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步将 first_reply_comment_id 不为 0 的一级评论 ID, 同步到 redis 中
|
||||
*
|
||||
* @param needSyncCommentIds 需要同步的评论 ID
|
||||
*/
|
||||
private void sync2Redis(List<Long> needSyncCommentIds) {
|
||||
// 获取 ValueOperations
|
||||
ValueOperations<String, Object> valueOperations = redisTemplate.opsForValue();
|
||||
|
||||
// 使用 RedisTemplate 的管道模式,允许在一个操作中批量发送多个命令,防止频繁操作 Redis
|
||||
redisTemplate.executePipelined((RedisCallback<?>) (connection) -> {
|
||||
needSyncCommentIds.forEach(needSyncCommentId -> {
|
||||
// 构建 Redis Key
|
||||
String key = RedisKeyConstants.buildHaveFirstReplyCommentKey(needSyncCommentId);
|
||||
|
||||
// 批量设置值并指定过期时间(5小时以内)
|
||||
valueOperations.set(key, 1, RandomUtil.randomInt(5 * 60 * 60), TimeUnit.SECONDS);
|
||||
});
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -115,4 +115,10 @@ public class CommentDO {
|
||||
*/
|
||||
@TableField(value = "update_time")
|
||||
private LocalDateTime updateTime;
|
||||
|
||||
/**
|
||||
* 一级评论的第一个回复的评论ID
|
||||
*/
|
||||
@TableField(value = "first_reply_comment_id")
|
||||
private Long firstReplyCommentId;
|
||||
}
|
||||
@@ -37,4 +37,22 @@ public interface CommentDOMapper extends BaseMapper<CommentDO> {
|
||||
*/
|
||||
int batchUpdateHeatByCommentIds(@Param("commentIds") List<Long> commentIds,
|
||||
@Param("commentHeatBOS") List<CommentHeatBO> commentHeatBOS);
|
||||
|
||||
/**
|
||||
* 查询一级评论下最早回复的评论
|
||||
*
|
||||
* @param parentId 一级评论 ID
|
||||
* @return 一级评论下最早回复的评论
|
||||
*/
|
||||
CommentDO selectEarliestByParentId(Long parentId);
|
||||
|
||||
/**
|
||||
* 更新一级评论的 first_reply_comment_id
|
||||
*
|
||||
* @param firstReplyCommentId 一级评论下最早回复的评论 ID
|
||||
* @param id 一级评论 ID
|
||||
* @return 更新数量
|
||||
*/
|
||||
int updateFirstReplyCommentIdByPrimaryKey(@Param("firstReplyCommentId") Long firstReplyCommentId,
|
||||
@Param("id") Long id);
|
||||
}
|
||||
@@ -20,6 +20,7 @@
|
||||
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
|
||||
<result column="update_time" jdbcType="TIMESTAMP" property="updateTime" />
|
||||
<result column="child_comment_total" jdbcType="BIGINT" property="childCommentTotal"/>
|
||||
<result column="first_reply_comment_id" jdbcType="BIGINT" property="firstReplyCommentId"/>
|
||||
</resultMap>
|
||||
<sql id="Base_Column_List">
|
||||
<!--@mbg.generated-->
|
||||
@@ -38,7 +39,8 @@
|
||||
is_top,
|
||||
create_time,
|
||||
update_time,
|
||||
child_comment_total
|
||||
child_comment_total,
|
||||
first_reply_comment_id
|
||||
</sql>
|
||||
|
||||
<select id="selectByCommentIds" resultMap="BaseResultMap" parameterType="list">
|
||||
@@ -47,7 +49,8 @@
|
||||
parent_id,
|
||||
user_id,
|
||||
child_comment_total,
|
||||
like_total
|
||||
like_total,
|
||||
first_reply_comment_id
|
||||
from t_comment
|
||||
where id in
|
||||
<foreach collection="commentIds" open="(" separator="," close=")" item="commentId">
|
||||
@@ -82,4 +85,19 @@
|
||||
#{commentId}
|
||||
</foreach>
|
||||
</update>
|
||||
|
||||
<select id="selectEarliestByParentId" resultMap="BaseResultMap" parameterType="map">
|
||||
select id
|
||||
from t_comment
|
||||
where parent_id = #{parentId}
|
||||
and level = 2
|
||||
order by create_time
|
||||
limit 1
|
||||
</select>
|
||||
|
||||
<update id="updateFirstReplyCommentIdByPrimaryKey" parameterType="map">
|
||||
update t_comment
|
||||
set first_reply_comment_id = #{firstReplyCommentId}
|
||||
where id = #{id}
|
||||
</update>
|
||||
</mapper>
|
||||
Reference in New Issue
Block a user