feat(count): 实现粉丝与关注计数服务

- 新增粉丝数与关注数的 MQ 消费逻辑
- 实现 Redis 计数更新与数据库落库操作
- 添加流量削峰限流机制提升系统稳定性
- 完善计数 DTO 与枚举类型定义
- 扩展 JsonUtils 工具类支持 Map 转换
- 更新 MQ 常量与 Redis Key 管理策略
-优化 MyBatis Mapper 支持计数插入或更新操作,Mybatis-plus操作起来属于硬编码,所以使用MyBatis的XML形式
This commit is contained in:
2025-10-16 19:21:28 +08:00
parent c6ac7193c1
commit d59acad051
12 changed files with 422 additions and 7 deletions

View File

@@ -12,4 +12,14 @@ public interface MQConstants {
*/ */
String TOPIC_COUNT_FANS = "CountFansTopic"; String TOPIC_COUNT_FANS = "CountFansTopic";
/**
* Topic: 粉丝数计数入库
*/
String TOPIC_COUNT_FANS_2_DB = "CountFans2DBTopic";
/**
* Topic: 粉丝数计数入库
*/
String TOPIC_COUNT_FOLLOWING_2_DB = "CountFollowing2DBTopic";
} }

View File

@@ -0,0 +1,29 @@
package com.hanserwei.hannote.count.biz.constant;
public class RedisKeyConstants {
/**
* Hash Field: 粉丝总数
*/
public static final String FIELD_FANS_TOTAL = "fansTotal";
/**
* Hash Field: 关注总数
*/
public static final String FIELD_FOLLOWING_TOTAL = "followingTotal";
/**
* 用户维度计数 Key 前缀
*/
private static final String COUNT_USER_KEY_PREFIX = "count:user:";
/**
* 构建用户维度计数 Key
*
* @param userId 用户ID
* @return 用户维度计数 Key
*/
public static String buildCountUserKey(Long userId) {
return COUNT_USER_KEY_PREFIX + userId;
}
}

View File

@@ -0,0 +1,46 @@
package com.hanserwei.hannote.count.biz.consumer;
import cn.hutool.core.collection.CollUtil;
import com.google.common.util.concurrent.RateLimiter;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.domain.mapper.UserCountDOMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@SuppressWarnings("ALL")
@Component
@RocketMQMessageListener(consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_FANS_2_DB, // Group 组
topic = MQConstants.TOPIC_COUNT_FANS_2_DB // 主题 Topic
)
@Slf4j
public class CountFans2DBConsumer implements RocketMQListener<String> {
@Resource
private UserCountDOMapper userCountDOMapper;
// 每秒创建 5000 个令牌
private RateLimiter rateLimiter = RateLimiter.create(5000);
@Override
public void onMessage(String body) {
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
rateLimiter.acquire();
log.info("## 消费到了 MQ 【计数: 粉丝数入库】, {}...", body);
Map<Long, Integer> countMap = null;
try {
countMap = JsonUtils.parseMap(body, Long.class, Integer.class);
} catch (Exception e) {
log.error("## 解析 JSON 字符串异常", e);
}
if (CollUtil.isNotEmpty(countMap)) {
// 判断数据库中,若目标用户的记录不存在,则插入;若记录已存在,则直接更新
countMap.forEach((k, v) -> userCountDOMapper.insertOrUpdateFansTotalByUserId(v, k));
}
}
}

View File

@@ -1,15 +1,29 @@
package com.hanserwei.hannote.count.biz.consumer; package com.hanserwei.hannote.count.biz.consumer;
import com.github.phantomthief.collection.BufferTrigger; import com.github.phantomthief.collection.BufferTrigger;
import com.google.common.collect.Maps;
import com.hanserwei.framework.common.utils.JsonUtils; import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants; import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants;
import com.hanserwei.hannote.count.biz.enums.FollowUnfollowTypeEnum;
import com.hanserwei.hannote.count.biz.model.dto.CountFollowUnfollowMqDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
@@ -18,20 +32,96 @@ import java.util.List;
) )
@Slf4j @Slf4j
public class CountFansConsumer implements RocketMQListener<String> { public class CountFansConsumer implements RocketMQListener<String> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private RocketMQTemplate rocketMQTemplate;
private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking() private final BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.bufferSize(50000) // 缓存队列的最大容量 .bufferSize(50000) // 缓存队列的最大容量
.batchSize(1000) // 一批次最多聚合 1000 条 .batchSize(1000) // 一批次最多聚合 1000 条
.linger(Duration.ofSeconds(1)) // 多久聚合一次 .linger(Duration.ofSeconds(1)) // 多久聚合一次
.setConsumerEx(this::consumeMessage) .setConsumerEx(this::consumeMessage)
.build(); .build();
@Override @Override
public void onMessage(String body) { public void onMessage(String body) {
// 往 bufferTrigger 中添加元素 // 往 bufferTrigger 中添加元素
bufferTrigger.enqueue(body); bufferTrigger.enqueue(body);
} }
private void consumeMessage(List<String> bodys) { private void consumeMessage(List<String> body) {
log.info("==> 聚合消息, size: {}", bodys.size()); log.info("==> 聚合消息, size: {}", body.size());
log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys)); log.info("==> 聚合消息, {}", JsonUtils.toJsonString(body));
// List<String> body 转换成 List<CountFollowUnfollowMqDTO>
List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOList = body.stream()
.map(e -> JsonUtils.parseObject(e, CountFollowUnfollowMqDTO.class))
.toList();
// 按目标用户进行分组
Map<Long, List<CountFollowUnfollowMqDTO>> groupMap = countFollowUnfollowMqDTOList.stream()
.collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getTargetUserId));
// 按组汇聚数据,统计出最终数据
Map<Long, Integer> countMap = Maps.newHashMap();
for (Map.Entry<Long, List<CountFollowUnfollowMqDTO>> entry : groupMap.entrySet()) {
List<CountFollowUnfollowMqDTO> list = entry.getValue();
// 最终数据
int finalCount = 0;
for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO : list) {
// 获取操作类型
Integer type = countFollowUnfollowMqDTO.getType();
// 根据操作类型,获取对应枚举
FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);
// 若枚举类型为空,则跳过
if (Objects.isNull(followUnfollowTypeEnum)) {
continue;
}
switch (followUnfollowTypeEnum) {
case FOLLOW -> finalCount++;
case UNFOLLOW -> finalCount--;
}
}
// 将分组后统计出的最终计数,存入 countMap 中
countMap.put(entry.getKey(), finalCount);
}
log.info("## 聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));
// 更新 Redis
countMap.forEach((k, v) -> {
// Redis Key
String redisKey = RedisKeyConstants.buildCountUserKey(k);
// 判断 Redis 中 Hash 是否存在
boolean isExisted = redisTemplate.hasKey(redisKey);
// 若存在才会更新
// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
if (isExisted) {
// 对目标用户 Hash 中的粉丝数字段进行计数操作
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, v);
}
});
// 发送 MQ, 计数数据落库
// 构建MQ消息体
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
.build();
// 异步发送消息提高接口响应速度
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数服务粉丝数入库】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数服务粉丝数入库】MQ 发送异常: ", throwable);
}
});
} }
} }

View File

@@ -0,0 +1,54 @@
package com.hanserwei.hannote.count.biz.consumer;
import com.google.common.util.concurrent.RateLimiter;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.domain.mapper.UserCountDOMapper;
import com.hanserwei.hannote.count.biz.enums.FollowUnfollowTypeEnum;
import com.hanserwei.hannote.count.biz.model.dto.CountFollowUnfollowMqDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Component
@SuppressWarnings("ALL")
@RocketMQMessageListener(consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_FOLLOWING_2_DB, // Group 组
topic = MQConstants.TOPIC_COUNT_FOLLOWING_2_DB // 主题 Topic
)
@Slf4j
public class CountFollowing2DBConsumer implements RocketMQListener<String> {
@Resource
private UserCountDOMapper userCountDOMapper;
// 每秒创建 5000 个令牌
private RateLimiter rateLimiter = RateLimiter.create(5000);
@Override
public void onMessage(String body) {
// 流量削峰:通过获取令牌,如果没有令牌可用,将阻塞,直到获得
rateLimiter.acquire();
log.info("## 消费到了 MQ 【计数: 关注数入库】, {}...", body);
if (StringUtils.isBlank(body)) return;
CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class);
// 操作类型:关注 or 取关
Integer type = countFollowUnfollowMqDTO.getType();
// 原用户ID
Long userId = countFollowUnfollowMqDTO.getUserId();
// 关注数:关注 +1 取关 -1
int count = Objects.equals(type, FollowUnfollowTypeEnum.FOLLOW.getCode()) ? 1 : -1;
// 判断数据库中,若原用户的记录不存在,则插入;若记录已存在,则直接更新
userCountDOMapper.insertOrUpdateFollowingTotalByUserId(count, userId);
}
}

View File

@@ -1,11 +1,25 @@
package com.hanserwei.hannote.count.biz.consumer; package com.hanserwei.hannote.count.biz.consumer;
import com.hanserwei.framework.common.utils.JsonUtils;
import com.hanserwei.hannote.count.biz.constant.MQConstants; import com.hanserwei.hannote.count.biz.constant.MQConstants;
import com.hanserwei.hannote.count.biz.constant.RedisKeyConstants;
import com.hanserwei.hannote.count.biz.enums.FollowUnfollowTypeEnum;
import com.hanserwei.hannote.count.biz.model.dto.CountFollowUnfollowMqDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects;
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FOLLOWING, consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FOLLOWING,
@@ -13,8 +27,56 @@ import org.springframework.stereotype.Component;
) )
@Slf4j @Slf4j
public class CountFollowingConsumer implements RocketMQListener<String> { public class CountFollowingConsumer implements RocketMQListener<String> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override @Override
public void onMessage(String body) { public void onMessage(String body) {
log.info("## 消费了 MQ [计数:关注数]: {}", body); log.info("## 消费了 MQ [计数:关注数]: {}", body);
if (StringUtils.isBlank(body)) {
return;
}
// 关注数和粉丝数计数场景不同,单个用户无法短时间内关注大量用户,所以无需聚合
// 直接对 Redis 中的 Hash 进行 +1 或 -1 操作即可
CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class);
// 操作类型:关注 or 取关
assert countFollowUnfollowMqDTO != null;
Integer type = countFollowUnfollowMqDTO.getType();
// 原用户ID
Long userId = countFollowUnfollowMqDTO.getUserId();
// 更新 Redis
String redisKey = RedisKeyConstants.buildCountUserKey(userId);
// 判断 Hash 是否存在
boolean isExisted = redisTemplate.hasKey(redisKey);
// 若存在
if (isExisted) {
// 关注数:关注 +1 取关 -1
long count = Objects.equals(type, FollowUnfollowTypeEnum.FOLLOW.getCode()) ? 1 : -1;
// 对 Hash 中的 followingTotal 字段进行加减操作
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FOLLOWING_TOTAL, count);
}
// 发送 MQ, 关注数写库
// 构建消息对象
Message<String> message = MessageBuilder.withPayload(body)
.build();
// 异步发送 MQ 消息
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FOLLOWING_2_DB, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数服务关注数入库】MQ 发送成功SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数服务关注数入库】MQ 发送异常: ", throwable);
}
});
} }
} }

View File

@@ -3,7 +3,26 @@ package com.hanserwei.hannote.count.biz.domain.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hanserwei.hannote.count.biz.domain.dataobject.UserCountDO; import com.hanserwei.hannote.count.biz.domain.dataobject.UserCountDO;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper @Mapper
public interface UserCountDOMapper extends BaseMapper<UserCountDO> { public interface UserCountDOMapper extends BaseMapper<UserCountDO> {
/**
* 添加或更新粉丝总数
*
* @param count 粉丝数
* @param userId 用户ID
* @return 影响行数
*/
int insertOrUpdateFansTotalByUserId(@Param("count") Integer count, @Param("userId") Long userId);
/**
* 添加或更新关注总数
*
* @param count 关注数
* @param userId 用户ID
* @return 影响行数
*/
int insertOrUpdateFollowingTotalByUserId(@Param("count") Integer count, @Param("userId") Long userId);
} }

View File

@@ -0,0 +1,28 @@
package com.hanserwei.hannote.count.biz.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
@Getter
@AllArgsConstructor
public enum FollowUnfollowTypeEnum {
// 关注
FOLLOW(1),
// 取关
UNFOLLOW(0),
;
private final Integer code;
public static FollowUnfollowTypeEnum valueOf(Integer code) {
for (FollowUnfollowTypeEnum followUnfollowTypeEnum : FollowUnfollowTypeEnum.values()) {
if (Objects.equals(code, followUnfollowTypeEnum.getCode())) {
return followUnfollowTypeEnum;
}
}
return null;
}
}

View File

@@ -0,0 +1,29 @@
package com.hanserwei.hannote.count.biz.model.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class CountFollowUnfollowMqDTO {
/**
* 原用户
*/
private Long userId;
/**
* 目标用户
*/
private Long targetUserId;
/**
* 1:关注 0:取关
*/
private Integer type;
}

View File

@@ -16,4 +16,16 @@
<!--@mbg.generated--> <!--@mbg.generated-->
id, user_id, fans_total, following_total, note_total, like_total, collect_total id, user_id, fans_total, following_total, note_total, like_total, collect_total
</sql> </sql>
<insert id="insertOrUpdateFansTotalByUserId" parameterType="map">
INSERT INTO t_user_count (user_id, fans_total)
VALUES (#{userId}, #{count})
ON DUPLICATE KEY UPDATE fans_total = fans_total + (#{count});
</insert>
<insert id="insertOrUpdateFollowingTotalByUserId">
INSERT INTO t_user_count (user_id, following_total)
VALUES (#{userId}, #{count})
ON DUPLICATE KEY UPDATE following_total = following_total + (#{count});
</insert>
</mapper> </mapper>

View File

@@ -137,7 +137,7 @@ class MQTests {
// 构建消息体 DTO // 构建消息体 DTO
CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = CountFollowUnfollowMqDTO.builder() CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = CountFollowUnfollowMqDTO.builder()
.userId(i + 1) // 关注者用户 ID .userId(i + 1) // 关注者用户 ID
.targetUserId(27L) // 目标用户 .targetUserId(100L) // 目标用户
.type(FollowUnfollowTypeEnum.FOLLOW.getCode()) .type(FollowUnfollowTypeEnum.FOLLOW.getCode())
.build(); .build();
@@ -146,15 +146,28 @@ class MQTests {
.build(); .build();
// 发送 MQ 通知计数服务:统计粉丝数 // 发送 MQ 通知计数服务:统计粉丝数
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS, message, new SendCallback() { // rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS, message, new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
// log.info("==> 【计数服务粉丝数】MQ 发送成功SendResult: {}", sendResult);
// }
//
// @Override
// public void onException(Throwable throwable) {
// log.error("==> 【计数服务粉丝数】MQ 发送异常: ", throwable);
// }
// });
// 发送 MQ 通知计数服务:统计关注数
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FOLLOWING, message, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("==> 【计数服务:粉丝数】MQ 发送成功SendResult: {}", sendResult); log.info("==> 【计数服务:关注数】MQ 发送成功SendResult: {}", sendResult);
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("==> 【计数服务:粉丝数】MQ 发送异常: ", throwable); log.error("==> 【计数服务:关注数】MQ 发送异常: ", throwable);
} }
}); });
} }

View File

@@ -1,5 +1,6 @@
package com.hanserwei.framework.common.utils; package com.hanserwei.framework.common.utils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializationFeature;
@@ -7,6 +8,8 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.Map;
public class JsonUtils { public class JsonUtils {
private static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -51,4 +54,24 @@ public class JsonUtils {
return OBJECT_MAPPER.readValue(jsonStr, clazz); return OBJECT_MAPPER.readValue(jsonStr, clazz);
} }
/**
* 将 JSON 字符串转换为 Map
*
* @param jsonStr JSON 字符串
* @param keyClass 键的类型
* @param valueClass 值的类型
* @param <K> 键的类型
* @param <V> 值的类型
* @return Map
* @throws Exception 抛出异常
*/
public static <K, V> Map<K, V> parseMap(String jsonStr, Class<K> keyClass, Class<V> valueClass) throws Exception {
// 创建 TypeReference指定泛型类型
TypeReference<Map<K, V>> typeRef = new TypeReference<Map<K, V>>() {
};
// 将 JSON 字符串转换为 Map
return OBJECT_MAPPER.readValue(jsonStr, OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass));
}
} }