@@ -1,192 +0,0 @@
|
|||||||
package com.hanserwei.hannote.comment.biz.consumer;
|
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
|
||||||
import com.hanserwei.framework.common.utils.JsonUtils;
|
|
||||||
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
|
||||||
import com.hanserwei.hannote.comment.biz.constants.RedisKeyConstants;
|
|
||||||
import com.hanserwei.hannote.comment.biz.domain.dataobject.CommentDO;
|
|
||||||
import com.hanserwei.hannote.comment.biz.domain.mapper.CommentDOMapper;
|
|
||||||
import com.hanserwei.hannote.comment.biz.domain.mapper.NoteCountDOMapper;
|
|
||||||
import com.hanserwei.hannote.comment.biz.enums.CommentLevelEnum;
|
|
||||||
import jakarta.annotation.Resource;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.rocketmq.client.producer.SendCallback;
|
|
||||||
import org.apache.rocketmq.client.producer.SendResult;
|
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@SuppressWarnings("UnstableApiUsage")
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_DELETE_COMMENT, // Group
|
|
||||||
topic = MQConstants.TOPIC_DELETE_COMMENT // 消费的主题 Topic
|
|
||||||
)
|
|
||||||
public class DeleteCommentConsumer implements RocketMQListener<String> {
|
|
||||||
|
|
||||||
// 每秒创建 1000 个令牌
|
|
||||||
private final RateLimiter rateLimiter = RateLimiter.create(1000);
|
|
||||||
@Resource
|
|
||||||
private CommentDOMapper commentDOMapper;
|
|
||||||
@Resource
|
|
||||||
private NoteCountDOMapper noteCountDOMapper;
|
|
||||||
@Resource
|
|
||||||
private RedisTemplate<String, Object> redisTemplate;
|
|
||||||
@Resource
|
|
||||||
private RocketMQTemplate rocketMQTemplate;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(String body) {
|
|
||||||
// 令牌桶流控
|
|
||||||
rateLimiter.acquire();
|
|
||||||
|
|
||||||
log.info("## 【删除评论 - 后续业务处理】消费者消费成功, body: {}", body);
|
|
||||||
|
|
||||||
CommentDO commentDO = JsonUtils.parseObject(body, CommentDO.class);
|
|
||||||
|
|
||||||
// 评论级别
|
|
||||||
Integer level = null;
|
|
||||||
if (commentDO != null) {
|
|
||||||
level = commentDO.getLevel();
|
|
||||||
}
|
|
||||||
|
|
||||||
CommentLevelEnum commentLevelEnum = CommentLevelEnum.valueOf(level);
|
|
||||||
|
|
||||||
if (commentLevelEnum != null) {
|
|
||||||
switch (commentLevelEnum) {
|
|
||||||
case ONE -> { // 一级评论
|
|
||||||
if (commentDO != null) {
|
|
||||||
handleOneLevelComment(commentDO);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case TWO -> { // 二级评论
|
|
||||||
if (commentDO != null) {
|
|
||||||
handleTwoLevelComment(commentDO);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 一级评论处理
|
|
||||||
*
|
|
||||||
* @param commentDO 评论
|
|
||||||
*/
|
|
||||||
private void handleOneLevelComment(CommentDO commentDO) {
|
|
||||||
Long commentId = commentDO.getId();
|
|
||||||
Long noteId = commentDO.getNoteId();
|
|
||||||
|
|
||||||
// 1. 关联评论删除(一级评论下所有子评论,都需要删除)
|
|
||||||
int count = commentDOMapper.deleteByParentId(commentId);
|
|
||||||
|
|
||||||
// 2. 计数更新(笔记下总评论数)
|
|
||||||
// 更新 Redis 缓存
|
|
||||||
String redisKey = RedisKeyConstants.buildNoteCommentTotalKey(noteId);
|
|
||||||
boolean hasKey = redisTemplate.hasKey(redisKey);
|
|
||||||
|
|
||||||
if (hasKey) {
|
|
||||||
// 笔记评论总数 -1
|
|
||||||
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_COMMENT_TOTAL, -(count + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新 t_note_count 计数表
|
|
||||||
noteCountDOMapper.updateCommentTotalByNoteId(noteId, -(count + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 二级评论处理
|
|
||||||
*
|
|
||||||
* @param commentDO 评论
|
|
||||||
*/
|
|
||||||
private void handleTwoLevelComment(CommentDO commentDO) {
|
|
||||||
Long commentId = commentDO.getId();
|
|
||||||
|
|
||||||
// 1. 批量删除关联评论(递归查询回复评论,并批量删除)
|
|
||||||
List<Long> replyCommentIds = Lists.newArrayList();
|
|
||||||
recurrentGetReplyCommentId(replyCommentIds, commentId);
|
|
||||||
|
|
||||||
// 被删除的行数
|
|
||||||
int count = 0;
|
|
||||||
if (CollUtil.isNotEmpty(replyCommentIds)) {
|
|
||||||
count = commentDOMapper.deleteByIds(replyCommentIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. 更新一级评论的计数
|
|
||||||
Long parentCommentId = commentDO.getParentId();
|
|
||||||
String redisKey = RedisKeyConstants.buildCountCommentKey(parentCommentId);
|
|
||||||
|
|
||||||
boolean hasKey = redisTemplate.hasKey(redisKey);
|
|
||||||
if (hasKey) {
|
|
||||||
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_CHILD_COMMENT_TOTAL, -(count + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. 若是最早的发布的二级评论被删除,需要更新一级评论的 first_reply_comment_id
|
|
||||||
|
|
||||||
// 查询一级评论
|
|
||||||
CommentDO oneLevelCommentDO = commentDOMapper.selectById(parentCommentId);
|
|
||||||
Long firstReplyCommentId = oneLevelCommentDO.getFirstReplyCommentId();
|
|
||||||
|
|
||||||
// 若删除的是最早回复的二级评论
|
|
||||||
if (Objects.equals(firstReplyCommentId, commentId)) {
|
|
||||||
// 查询数据库,重新获取一级评论最早回复的评论
|
|
||||||
CommentDO earliestCommentDO = commentDOMapper.selectEarliestByParentId(parentCommentId);
|
|
||||||
|
|
||||||
// 最早回复的那条评论 ID。若查询结果为 null, 则最早回复的评论 ID 为 null
|
|
||||||
Long earliestCommentId = Objects.nonNull(earliestCommentDO) ? earliestCommentDO.getId() : null;
|
|
||||||
// 更新其一级评论的 first_reply_comment_id
|
|
||||||
commentDOMapper.updateFirstReplyCommentIdByPrimaryKey(earliestCommentId, parentCommentId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. 重新计算一级评论的热度值
|
|
||||||
Set<Long> commentIds = Sets.newHashSetWithExpectedSize(1);
|
|
||||||
commentIds.add(parentCommentId);
|
|
||||||
|
|
||||||
// 异步发送计数 MQ, 更新评论热度值
|
|
||||||
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(commentIds))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 异步发送 MQ 消息
|
|
||||||
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COMMENT_HEAT_UPDATE, message, new SendCallback() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(SendResult sendResult) {
|
|
||||||
log.info("==> 【评论热度值更新】MQ 发送成功,SendResult: {}", sendResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onException(Throwable throwable) {
|
|
||||||
log.error("==> 【评论热度值更新】MQ 发送异常: ", throwable);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 递归获取全部回复的评论 ID
|
|
||||||
*
|
|
||||||
* @param commentIds 评论 ID 列表
|
|
||||||
* @param commentId 评论 ID
|
|
||||||
*/
|
|
||||||
private void recurrentGetReplyCommentId(List<Long> commentIds, Long commentId) {
|
|
||||||
CommentDO replyCommentDO = commentDOMapper.selectByReplyCommentId(commentId);
|
|
||||||
|
|
||||||
if (Objects.isNull(replyCommentDO)) return;
|
|
||||||
|
|
||||||
commentIds.add(replyCommentDO.getId());
|
|
||||||
Long replyCommentId = replyCommentDO.getId();
|
|
||||||
// 递归调用
|
|
||||||
recurrentGetReplyCommentId(commentIds, replyCommentId);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,29 +0,0 @@
|
|||||||
package com.hanserwei.hannote.comment.biz.consumer;
|
|
||||||
|
|
||||||
import com.hanserwei.hannote.comment.biz.constants.MQConstants;
|
|
||||||
import com.hanserwei.hannote.comment.biz.service.CommentService;
|
|
||||||
import jakarta.annotation.Resource;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
|
||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_DELETE_COMMENT_LOCAL_CACHE, // Group
|
|
||||||
topic = MQConstants.TOPIC_DELETE_COMMENT_LOCAL_CACHE, // 消费的主题 Topic
|
|
||||||
messageModel = MessageModel.BROADCASTING) // 广播模式
|
|
||||||
public class DeleteCommentLocalCacheConsumer implements RocketMQListener<String> {
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private CommentService commentService;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(String body) {
|
|
||||||
Long commentId = Long.valueOf(body);
|
|
||||||
log.info("## 消费者消费成功, commentId: {}", commentId);
|
|
||||||
|
|
||||||
commentService.deleteCommentLocalCache(commentId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -120,29 +120,4 @@ public interface CommentDOMapper extends BaseMapper<CommentDO> {
|
|||||||
*/
|
*/
|
||||||
List<CommentDO> selectChildCommentsByParentIdAndLimit(@Param("parentId") Long parentId,
|
List<CommentDO> selectChildCommentsByParentIdAndLimit(@Param("parentId") Long parentId,
|
||||||
@Param("limit") int limit);
|
@Param("limit") int limit);
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除一级评论下,所有二级评论
|
|
||||||
*
|
|
||||||
* @param commentId 一级评论 ID
|
|
||||||
* @return 删除数量
|
|
||||||
*/
|
|
||||||
int deleteByParentId(Long commentId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量删除评论
|
|
||||||
*
|
|
||||||
* @param commentIds 评论 ID 列表
|
|
||||||
* @return 删除数量
|
|
||||||
*/
|
|
||||||
int deleteByIds(@Param("commentIds") List<Long> commentIds);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据 reply_comment_id 查询
|
|
||||||
*
|
|
||||||
* @param commentId 回复的评论 ID
|
|
||||||
* @return 评论
|
|
||||||
*/
|
|
||||||
CommentDO selectByReplyCommentId(Long commentId);
|
|
||||||
}
|
}
|
||||||
@@ -3,7 +3,6 @@ package com.hanserwei.hannote.comment.biz.domain.mapper;
|
|||||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
import com.hanserwei.hannote.comment.biz.domain.dataobject.NoteCountDO;
|
import com.hanserwei.hannote.comment.biz.domain.dataobject.NoteCountDO;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
import org.apache.ibatis.annotations.Param;
|
|
||||||
|
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
|
public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
|
||||||
@@ -15,14 +14,4 @@ public interface NoteCountDOMapper extends BaseMapper<NoteCountDO> {
|
|||||||
* @return 笔记评论总数
|
* @return 笔记评论总数
|
||||||
*/
|
*/
|
||||||
Long selectCommentTotalByNoteId(Long noteId);
|
Long selectCommentTotalByNoteId(Long noteId);
|
||||||
|
|
||||||
/**
|
|
||||||
* 更新评论总数
|
|
||||||
*
|
|
||||||
* @param noteId 笔记ID
|
|
||||||
* @param count 评论数
|
|
||||||
* @return 更新数量
|
|
||||||
*/
|
|
||||||
int updateCommentTotalByNoteId(@Param("noteId") Long noteId,
|
|
||||||
@Param("count") int count);
|
|
||||||
}
|
}
|
||||||
@@ -3,8 +3,6 @@ package com.hanserwei.hannote.comment.biz.enums;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public enum CommentLevelEnum {
|
public enum CommentLevelEnum {
|
||||||
@@ -16,19 +14,4 @@ public enum CommentLevelEnum {
|
|||||||
|
|
||||||
private final Integer code;
|
private final Integer code;
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据类型 code 获取对应的枚举
|
|
||||||
*
|
|
||||||
* @param code 类型 code
|
|
||||||
* @return 枚举
|
|
||||||
*/
|
|
||||||
public static CommentLevelEnum valueOf(Integer code) {
|
|
||||||
for (CommentLevelEnum commentLevelEnum : CommentLevelEnum.values()) {
|
|
||||||
if (Objects.equals(code, commentLevelEnum.getCode())) {
|
|
||||||
return commentLevelEnum;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -54,11 +54,4 @@ public interface CommentService extends IService<CommentDO> {
|
|||||||
* @return 响应
|
* @return 响应
|
||||||
*/
|
*/
|
||||||
Response<?> deleteComment(DeleteCommentReqVO deleteCommentReqVO);
|
Response<?> deleteComment(DeleteCommentReqVO deleteCommentReqVO);
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除本地评论缓存
|
|
||||||
*
|
|
||||||
* @param commentId 评论ID
|
|
||||||
*/
|
|
||||||
void deleteCommentLocalCache(Long commentId);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -718,11 +718,6 @@ public class CommentServiceImpl extends ServiceImpl<CommentDOMapper, CommentDO>
|
|||||||
return Response.success();
|
return Response.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteCommentLocalCache(Long commentId) {
|
|
||||||
LOCAL_CACHE.invalidate(commentId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 初始化评论点赞布隆过滤器
|
* 初始化评论点赞布隆过滤器
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -201,26 +201,4 @@
|
|||||||
order by create_time
|
order by create_time
|
||||||
limit #{limit}
|
limit #{limit}
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
<delete id="deleteByParentId" parameterType="long">
|
|
||||||
delete
|
|
||||||
from t_comment
|
|
||||||
where parent_id = #{commentId}
|
|
||||||
</delete>
|
|
||||||
|
|
||||||
<delete id="deleteByIds" parameterType="map">
|
|
||||||
delete
|
|
||||||
from t_comment
|
|
||||||
where id in
|
|
||||||
<foreach collection="commentIds" item="commentId" open="(" separator="," close=")">
|
|
||||||
#{commentId}
|
|
||||||
</foreach>
|
|
||||||
</delete>
|
|
||||||
|
|
||||||
<select id="selectByReplyCommentId" resultMap="BaseResultMap" parameterType="long">
|
|
||||||
select
|
|
||||||
<include refid="Base_Column_List"/>
|
|
||||||
from t_comment
|
|
||||||
where reply_comment_id = #{commentId}
|
|
||||||
</select>
|
|
||||||
</mapper>
|
</mapper>
|
||||||
@@ -20,10 +20,4 @@
|
|||||||
from t_note_count
|
from t_note_count
|
||||||
where note_id = #{noteId}
|
where note_id = #{noteId}
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
<update id="updateCommentTotalByNoteId" parameterType="map">
|
|
||||||
update t_note_count
|
|
||||||
set comment_total = comment_total + #{count}
|
|
||||||
where note_id = #{noteId}
|
|
||||||
</update>
|
|
||||||
</mapper>
|
</mapper>
|
||||||
Reference in New Issue
Block a user