diff --git a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/DeleteNoteLocalCacheConsumer.java b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/DeleteNoteLocalCacheConsumer.java index 2f0a3fd..e9d75cf 100644 --- a/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/DeleteNoteLocalCacheConsumer.java +++ b/han-note-note/han-note-note-biz/src/main/java/com/hanserwei/hannote/note/biz/comsumer/DeleteNoteLocalCacheConsumer.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @Component @Slf4j @RocketMQMessageListener( - consumerGroup = "han_note_group", + consumerGroup = "han_note_group_" + MQConstants.TOPIC_DELETE_NOTE_LOCAL_CACHE, topic = MQConstants.TOPIC_DELETE_NOTE_LOCAL_CACHE, messageModel = MessageModel.BROADCASTING ) diff --git a/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java new file mode 100644 index 0000000..5e6e052 --- /dev/null +++ b/han-note-user-relation/han-note-user-relation-biz/src/main/java/com/hanserwei/hannote/user/relation/biz/consumer/FollowUnfollowConsumer.java @@ -0,0 +1,102 @@ +package com.hanserwei.hannote.user.relation.biz.consumer; + +import com.hanserwei.framework.common.utils.JsonUtils; +import com.hanserwei.hannote.user.relation.biz.constant.MQConstants; +import com.hanserwei.hannote.user.relation.biz.domain.dataobject.FansDO; +import com.hanserwei.hannote.user.relation.biz.domain.dataobject.FollowingDO; +import com.hanserwei.hannote.user.relation.biz.model.dto.FollowUserMqDTO; +import com.hanserwei.hannote.user.relation.biz.service.FansDOService; +import com.hanserwei.hannote.user.relation.biz.service.FollowingDOService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.LocalDateTime; +import java.util.Objects; + +@Component +@RocketMQMessageListener( + consumerGroup = "han_note_group_" + MQConstants.TOPIC_FOLLOW_OR_UNFOLLOW, + topic = MQConstants.TOPIC_FOLLOW_OR_UNFOLLOW +) +@Slf4j +public class FollowUnfollowConsumer implements RocketMQListener { + private final TransactionTemplate transactionTemplate; + private final FollowingDOService followingDOService; + private final FansDOService fansDOService; + + public FollowUnfollowConsumer(TransactionTemplate transactionTemplate, FollowingDOService followingDOService, FansDOService fansDOService) { + this.transactionTemplate = transactionTemplate; + this.followingDOService = followingDOService; + this.fansDOService = fansDOService; + } + + @Override + public void onMessage(Message message) { + // 消息体 + String bodyJsonStr = new String(message.getBody()); + // 标签 + String tags = message.getTags(); + + log.info("==> FollowUnfollowConsumer 消费了消息 {}, tags: {}", bodyJsonStr, tags); + // 根据MQ标签判断操作类型 + if (Objects.equals(tags, MQConstants.TAG_FOLLOW)){ + // 关注 + handleFollowTagMessage(bodyJsonStr); + } else if (Objects.equals(tags, MQConstants.TAG_UNFOLLOW)) { + // 取关 + // TODO: 待实现 + } + } + + /** + * 关注 + * @param bodyJsonStr 消息体 + */ + private void handleFollowTagMessage(String bodyJsonStr) { + // 解析消息体转换为DTO对象 + FollowUserMqDTO followUserMqDTO = JsonUtils.parseObject(bodyJsonStr, FollowUserMqDTO.class); + + // 判空 + if (Objects.isNull(followUserMqDTO)) { + return; + } + + // 幂等性:通过联合唯一索引保证 + + Long userId = followUserMqDTO.getUserId(); + Long followUserId = followUserMqDTO.getFollowUserId(); + LocalDateTime createTime = followUserMqDTO.getCreateTime(); + + // 编程式事物 + boolean isSuccess = Boolean.TRUE.equals(transactionTemplate.execute(status -> { + try { + // 关注成功需往数据库添加两条记录 + // 关注表:一条记录 + boolean followRecordSaved = followingDOService.save(FollowingDO.builder() + .userId(userId) + .followingUserId(followUserId) + .createTime(createTime) + .build()); + // 粉丝表:一条记录 + if (followRecordSaved){ + return fansDOService.save(FansDO.builder() + .userId(followUserId) + .fansUserId(userId) + .createTime(createTime) + .build()); + } + }catch (Exception e){ + status.setRollbackOnly(); + log.error("## 添加关注关系失败, userId: {}, followUserId: {}, createTime: {}", userId, followUserId, createTime); + } + return false; + })); + + log.info("## 数据库添加记录结果: {}", isSuccess); + // TODO: 更新 Redis 中被关注用户的 ZSet 粉丝列表 + } +} diff --git a/http-client/gateApi.http b/http-client/gateApi.http index 10b35a1..d6c98c7 100644 --- a/http-client/gateApi.http +++ b/http-client/gateApi.http @@ -3,7 +3,7 @@ POST http://localhost:8000/auth/verification/code/send Content-Type: application/json { - "email": "ssw010723@gmail.com" + "email": "2628273921@qq.com" } ### 登录/注册 @@ -11,8 +11,8 @@ POST http://localhost:8000/auth/login Content-Type: application/json { - "email": "ssw010723@gmail.com", - "code": "135466", + "email": "2628273921@qq.com", + "code": "825004", "type": 1 } @@ -144,5 +144,5 @@ Content-Type: application/json Authorization: Bearer {{token}} { - "followUserId":{{otherUserId}} + "followUserId": {{otherUserId}} } \ No newline at end of file diff --git a/sql/createTable.sql b/sql/createTable.sql index 92fec34..e035da5 100644 --- a/sql/createTable.sql +++ b/sql/createTable.sql @@ -175,4 +175,8 @@ CREATE TABLE `t_fans` DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT ='用户粉丝表'; +ALTER TABLE t_following ADD UNIQUE uk_user_id_following_user_id(user_id, following_user_id); + +ALTER TABLE t_fans ADD UNIQUE uk_user_id_fans_user_id(user_id, fans_user_id); +