@@ -7,6 +7,8 @@ import com.hanserwei.hannote.user.relation.biz.constant.MQConstants;
import com.hanserwei.hannote.user.relation.biz.constant.RedisKeyConstants ;
import com.hanserwei.hannote.user.relation.biz.domain.dataobject.FansDO ;
import com.hanserwei.hannote.user.relation.biz.domain.dataobject.FollowingDO ;
import com.hanserwei.hannote.user.relation.biz.enums.FollowUnfollowTypeEnum ;
import com.hanserwei.hannote.user.relation.biz.model.dto.CountFollowUnfollowMqDTO ;
import com.hanserwei.hannote.user.relation.biz.model.dto.FollowUserMqDTO ;
import com.hanserwei.hannote.user.relation.biz.model.dto.UnfollowUserMqDTO ;
import com.hanserwei.hannote.user.relation.biz.service.FansDOService ;
@@ -15,13 +17,17 @@ import com.hanserwei.hannote.user.relation.biz.util.DateUtils;
import jakarta.annotation.Resource ;
import lombok.RequiredArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.rocketmq.client.producer.SendCallback ;
import org.apache.rocketmq.client.producer.SendResult ;
import org.apache.rocketmq.common.message.Message ;
import org.apache.rocketmq.spring.annotation.ConsumeMode ;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener ;
import org.apache.rocketmq.spring.core.RocketMQListener ;
import org.apache.rocketmq.spring.core.RocketMQTemplate ;
import org.springframework.core.io.ClassPathResource ;
import org.springframework.data.redis.core.RedisTemplate ;
import org.springframework.data.redis.core.script.DefaultRedisScript ;
import org.springframework.messaging.support.MessageBuilder ;
import org.springframework.scripting.support.ResourceScriptSource ;
import org.springframework.stereotype.Component ;
import org.springframework.transaction.support.TransactionTemplate ;
@@ -32,8 +38,8 @@ import java.util.Objects;
@Component
@RocketMQMessageListener (
consumerGroup = " han_note_group_ " + MQConstants . TOPIC_FOLLOW_OR_UNFOLLOW ,
topic = MQConstants . TOPIC_FOLLOW_OR_UNFOLLOW ,
consumerGroup = " han_note_group_ " + MQConstants . TOPIC_FOLLOW_OR_UNFOLLOW , //han_note_group_FollowUnfollowTopic
topic = MQConstants . TOPIC_FOLLOW_OR_UNFOLLOW , //FollowUnfollowTopic
consumeMode = ConsumeMode . ORDERLY
)
@Slf4j
@@ -47,6 +53,8 @@ public class FollowUnfollowConsumer implements RocketMQListener<Message> {
private RateLimiter rateLimiter ;
@Resource
private RedisTemplate < Object , Object > redisTemplate ;
@Resource
private RocketMQTemplate rocketMQTemplate ;
@Override
public void onMessage ( Message message ) {
@@ -114,6 +122,17 @@ public class FollowUnfollowConsumer implements RocketMQListener<Message> {
String fansRedisKey = RedisKeyConstants . buildUserFansKey ( unfollowUserId ) ;
// 删除指定粉丝
redisTemplate . opsForZSet ( ) . remove ( fansRedisKey , userId ) ;
// 发送MQ消息通知计数服务, 统计关注数
// 构建DTO对象
CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = CountFollowUnfollowMqDTO . builder ( )
. userId ( userId )
. targetUserId ( unfollowUserId )
. type ( FollowUnfollowTypeEnum . UNFOLLOW . getCode ( ) )
. build ( ) ;
// 发送MQ
sendMQ ( countFollowUnfollowMqDTO ) ;
}
}
@@ -177,6 +196,53 @@ public class FollowUnfollowConsumer implements RocketMQListener<Message> {
// 执行Lua脚本
redisTemplate . execute ( script , Collections . singletonList ( fansZSetKey ) , userId , timestamp ) ;
// 发送MQ消息通知计数服务, 统计关注数
// 构建消息体
CountFollowUnfollowMqDTO countFollowUnfollowMqDTO = CountFollowUnfollowMqDTO . builder ( )
. userId ( userId )
. targetUserId ( followUserId )
. type ( FollowUnfollowTypeEnum . FOLLOW . getCode ( ) )
. build ( ) ;
sendMQ ( countFollowUnfollowMqDTO ) ;
}
}
/**
* 发送MQ消息
*
* @param countFollowUnfollowMqDTO 消息体
*/
private void sendMQ ( CountFollowUnfollowMqDTO countFollowUnfollowMqDTO ) {
// 构建MQ消息体
org . springframework . messaging . Message < String > message = MessageBuilder . withPayload ( JsonUtils . toJsonString ( countFollowUnfollowMqDTO ) )
. build ( ) ;
// 异步发送 MQ 消息
rocketMQTemplate . asyncSend ( MQConstants . TOPIC_COUNT_FOLLOWING , message , new SendCallback ( ) {
@Override
public void onSuccess ( SendResult sendResult ) {
log . info ( " ==> 【计数服务: 关注数】MQ 发送成功, SendResult: {} " , sendResult ) ;
}
@Override
public void onException ( Throwable throwable ) {
log . error ( " ==> 【计数服务: 关注数】MQ 发送异常: " , throwable ) ;
}
} ) ;
// 发送 MQ 通知计数服务:统计粉丝数
rocketMQTemplate . asyncSend ( MQConstants . TOPIC_COUNT_FANS , message , new SendCallback ( ) {
@Override
public void onSuccess ( SendResult sendResult ) {
log . info ( " ==> 【计数服务: 粉丝数】MQ 发送成功, SendResult: {} " , sendResult ) ;
}
@Override
public void onException ( Throwable throwable ) {
log . error ( " ==> 【计数服务: 粉丝数】MQ 发送异常: " , throwable ) ;
}
} ) ;
}
}