Compare commits
5 Commits
efd2e51d24
...
5c4d8862a2
| Author | SHA1 | Date | |
|---|---|---|---|
| 5c4d8862a2 | |||
| a6f4d437d2 | |||
| c1c0590cce | |||
| 8a1681e590 | |||
| f217b8133a |
3
.idea/MyBatisCodeHelperDatasource.xml
generated
3
.idea/MyBatisCodeHelperDatasource.xml
generated
@@ -20,10 +20,11 @@
|
|||||||
<option name="lombokDataAnnotation" value="true" />
|
<option name="lombokDataAnnotation" value="true" />
|
||||||
<option name="lombokNoArgsConstructor" value="true" />
|
<option name="lombokNoArgsConstructor" value="true" />
|
||||||
<option name="mapperAnnotaion" value="true" />
|
<option name="mapperAnnotaion" value="true" />
|
||||||
<option name="mapperFilesFolder" value="$PROJECT_DIR$/han-note-auth/src/main/resources/mapperxml" />
|
<option name="mapperFilesFolder" value="$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml" />
|
||||||
<option name="mapperFilesFolderList">
|
<option name="mapperFilesFolderList">
|
||||||
<list>
|
<list>
|
||||||
<option value="$PROJECT_DIR$/han-note-auth/src/main/resources/mapperxml" />
|
<option value="$PROJECT_DIR$/han-note-auth/src/main/resources/mapperxml" />
|
||||||
|
<option value="$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml" />
|
||||||
</list>
|
</list>
|
||||||
</option>
|
</option>
|
||||||
<option name="moduleNameToPackageAndPathMap">
|
<option name="moduleNameToPackageAndPathMap">
|
||||||
|
|||||||
1
.idea/data_source_mapping.xml
generated
1
.idea/data_source_mapping.xml
generated
@@ -3,6 +3,7 @@
|
|||||||
<component name="DataSourcePerFileMappings">
|
<component name="DataSourcePerFileMappings">
|
||||||
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/5b969fbe-0f66-42be-8d30-ff21036ab8a4/console.sql" value="5b969fbe-0f66-42be-8d30-ff21036ab8a4" />
|
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/5b969fbe-0f66-42be-8d30-ff21036ab8a4/console.sql" value="5b969fbe-0f66-42be-8d30-ff21036ab8a4" />
|
||||||
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/f2474a4a-e4f1-4afa-bd43-7ae7738b47c5/console.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$APPLICATION_CONFIG_DIR$/consoles/db/f2474a4a-e4f1-4afa-bd43-7ae7738b47c5/console.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
|
<file url="file://$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml/InsertRecordMapper.xml" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/sql/createData.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$PROJECT_DIR$/sql/createData.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/sql/createTable.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
<file url="file://$PROJECT_DIR$/sql/createTable.sql" value="f2474a4a-e4f1-4afa-bd43-7ae7738b47c5" />
|
||||||
<file url="file://$PROJECT_DIR$/sql/leafcreatetable.sql" value="c4c1f1dc-816f-4113-88d6-9ebd7677af82" />
|
<file url="file://$PROJECT_DIR$/sql/leafcreatetable.sql" value="c4c1f1dc-816f-4113-88d6-9ebd7677af82" />
|
||||||
|
|||||||
1
.idea/dictionaries/project.xml
generated
1
.idea/dictionaries/project.xml
generated
@@ -4,6 +4,7 @@
|
|||||||
<w>asyn</w>
|
<w>asyn</w>
|
||||||
<w>hannote</w>
|
<w>hannote</w>
|
||||||
<w>hanserwei</w>
|
<w>hanserwei</w>
|
||||||
|
<w>jobhandler</w>
|
||||||
<w>nacos</w>
|
<w>nacos</w>
|
||||||
<w>operationlog</w>
|
<w>operationlog</w>
|
||||||
<w>rustfs</w>
|
<w>rustfs</w>
|
||||||
|
|||||||
3
.idea/inspectionProfiles/Project_Default.xml
generated
3
.idea/inspectionProfiles/Project_Default.xml
generated
@@ -24,5 +24,8 @@
|
|||||||
</option>
|
</option>
|
||||||
</inspection_tool>
|
</inspection_tool>
|
||||||
<inspection_tool class="SqlNoDataSourceInspection" enabled="false" level="WARNING" enabled_by_default="false" />
|
<inspection_tool class="SqlNoDataSourceInspection" enabled="false" level="WARNING" enabled_by_default="false" />
|
||||||
|
<inspection_tool class="SqlResolveInspection" enabled="true" level="ERROR" enabled_by_default="true">
|
||||||
|
<option name="suppressForPossibleStringLiterals" value="true" />
|
||||||
|
</inspection_tool>
|
||||||
</profile>
|
</profile>
|
||||||
</component>
|
</component>
|
||||||
@@ -17,7 +17,7 @@ import java.util.Objects;
|
|||||||
|
|
||||||
@Component
|
@Component
|
||||||
@SuppressWarnings("ALL")
|
@SuppressWarnings("ALL")
|
||||||
@RocketMQMessageListener(consumerGroup = "han_note_" + MQConstants.TOPIC_COUNT_FOLLOWING_2_DB, // Group 组
|
@RocketMQMessageListener(consumerGroup = "han_note_group_" + MQConstants.TOPIC_COUNT_FOLLOWING_2_DB, // Group 组
|
||||||
topic = MQConstants.TOPIC_COUNT_FOLLOWING_2_DB // 主题 Topic
|
topic = MQConstants.TOPIC_COUNT_FOLLOWING_2_DB // 主题 Topic
|
||||||
)
|
)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|||||||
@@ -83,6 +83,19 @@
|
|||||||
<groupId>com.xuxueli</groupId>
|
<groupId>com.xuxueli</groupId>
|
||||||
<artifactId>xxl-job-core</artifactId>
|
<artifactId>xxl-job-core</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Jackson 组件 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.hanserwei</groupId>
|
||||||
|
<artifactId>hanserwei-spring-boot-starter-jackson</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Rocket MQ -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
||||||
|
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class RedisTemplateConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
|
||||||
|
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
|
||||||
|
// 设置 RedisTemplate 的连接工厂
|
||||||
|
redisTemplate.setConnectionFactory(connectionFactory);
|
||||||
|
|
||||||
|
// 使用 StringRedisSerializer 来序列化和反序列化 redis 的 key 值,确保 key 是可读的字符串
|
||||||
|
redisTemplate.setKeySerializer(new StringRedisSerializer());
|
||||||
|
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
|
||||||
|
|
||||||
|
// 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 redis 的 value 值, 确保存储的是 JSON 格式
|
||||||
|
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
|
||||||
|
redisTemplate.setValueSerializer(serializer);
|
||||||
|
redisTemplate.setHashValueSerializer(serializer);
|
||||||
|
|
||||||
|
redisTemplate.afterPropertiesSet();
|
||||||
|
return redisTemplate;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.config;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Import;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@Import(RocketMQAutoConfiguration.class)
|
||||||
|
public class RocketMQConfig {
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.constant;
|
||||||
|
|
||||||
|
public interface MQConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 计数 - 笔记点赞数
|
||||||
|
*/
|
||||||
|
String TOPIC_COUNT_NOTE_LIKE = "CountNoteLikeTopic";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Topic: 计数 - 笔记收藏数
|
||||||
|
*/
|
||||||
|
String TOPIC_COUNT_NOTE_COLLECT = "CountNoteCollectTopic";
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.constant;
|
||||||
|
|
||||||
|
public class RedisKeyConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 前缀
|
||||||
|
*/
|
||||||
|
public static final String BLOOM_TODAY_NOTE_LIKE_LIST_KEY = "bloom:dataAlign:note:likes:";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 前缀
|
||||||
|
*/
|
||||||
|
public static final String BLOOM_TODAY_NOTE_COLLECT_LIST_KEY = "bloom:dataAlign:note:collects:";
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
||||||
|
*
|
||||||
|
* @param date 日期
|
||||||
|
* @return 完整的布隆过滤器:日增量变更数据,用户笔记点赞,取消点赞 KEY
|
||||||
|
*/
|
||||||
|
public static String buildBloomUserNoteLikeListKey(String date) {
|
||||||
|
return BLOOM_TODAY_NOTE_LIKE_LIST_KEY + date;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||||
|
*
|
||||||
|
* @param date 日期
|
||||||
|
* @return 完整的布隆过滤器:日增量变更数据,用户笔记收藏,取消收藏 KEY
|
||||||
|
*/
|
||||||
|
public static String buildBloomUserNoteCollectListKey(String date) {
|
||||||
|
return BLOOM_TODAY_NOTE_COLLECT_LIST_KEY + date;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,21 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.constant;
|
||||||
|
|
||||||
|
public class TableConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表名中的分隔符
|
||||||
|
*/
|
||||||
|
private static final String TABLE_NAME_SEPARATE = "_";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 拼接表名后缀
|
||||||
|
*
|
||||||
|
* @param hashKey 哈希Keu
|
||||||
|
* @return 表名后缀
|
||||||
|
*/
|
||||||
|
public static String buildTableNameSuffix(String date, long hashKey) {
|
||||||
|
// 拼接完整的表名
|
||||||
|
return date + TABLE_NAME_SEPARATE + hashKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.consumer;
|
||||||
|
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
||||||
|
import com.hanserwei.hannote.data.align.model.vo.CollectUnCollectNoteMqDTO;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||||
|
import org.springframework.data.redis.core.script.RedisScript;
|
||||||
|
import org.springframework.scripting.support.ResourceScriptSource;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
consumerGroup = "han_note_group_data_align_" + MQConstants.TOPIC_COUNT_NOTE_COLLECT,
|
||||||
|
topic = MQConstants.TOPIC_COUNT_NOTE_COLLECT
|
||||||
|
)
|
||||||
|
public class TodayNoteCollectIncrementData2DBConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
@Resource
|
||||||
|
private TransactionTemplate transactionTemplate;
|
||||||
|
@Resource
|
||||||
|
private InsertRecordMapper insertRecordMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表总分片数
|
||||||
|
*/
|
||||||
|
@Value("${table.shards}")
|
||||||
|
private int tableShards;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String body) {
|
||||||
|
log.info("## TodayNoteCollectIncrementData2DBConsumer 消费到了 MQ: {}", body);
|
||||||
|
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||||
|
// 消息体转DTO
|
||||||
|
CollectUnCollectNoteMqDTO collectUnCollectNoteMqDTO = JsonUtils.parseObject(body, CollectUnCollectNoteMqDTO.class);
|
||||||
|
if (Objects.isNull(collectUnCollectNoteMqDTO)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("## TodayNoteCollectIncrementData2DBConsumer 笔记收藏数据:{}", JsonUtils.toJsonString(collectUnCollectNoteMqDTO));
|
||||||
|
|
||||||
|
// 被收藏的笔记ID
|
||||||
|
Long noteId = collectUnCollectNoteMqDTO.getNoteId();
|
||||||
|
// 笔记作者ID
|
||||||
|
Long noteCreatorId = collectUnCollectNoteMqDTO.getNoteCreatorId();
|
||||||
|
// 今日日期
|
||||||
|
String date = LocalDate.now()
|
||||||
|
.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||||
|
|
||||||
|
String bloomKey = RedisKeyConstants.buildBloomUserNoteCollectListKey(date);
|
||||||
|
|
||||||
|
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||||
|
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||||
|
// Lua 脚本路径
|
||||||
|
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_today_note_collect_check.lua")));
|
||||||
|
// 返回值类型
|
||||||
|
script.setResultType(Long.class);
|
||||||
|
|
||||||
|
// 执行 Lua 脚本,拿到返回结果
|
||||||
|
Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId);
|
||||||
|
log.info("布隆过滤器判断结果:{}", result);
|
||||||
|
|
||||||
|
// 若布隆过滤器判断不存在(绝对正确)
|
||||||
|
if (Objects.equals(result, 0L)) {
|
||||||
|
// 2. 若无,才会落库,减轻数据库压力
|
||||||
|
// 根据分片总数,取模,分别获取对应的分片序号
|
||||||
|
long userIdHashKey = noteCreatorId % tableShards;
|
||||||
|
long noteIdHashKey = noteId % tableShards;
|
||||||
|
log.info("根据分片总数,取模,分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey);
|
||||||
|
|
||||||
|
// 编程式事务,保证多语句的原子性
|
||||||
|
transactionTemplate.execute(status -> {
|
||||||
|
try {
|
||||||
|
// 将日增量变更数据,分别写入两张表
|
||||||
|
// - t_data_align_note_collect_count_temp_日期_分片序号
|
||||||
|
// - t_data_align_user_collect_count_temp_日期_分片序号
|
||||||
|
insertRecordMapper.insert2DataAlignNoteCollectCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||||
|
insertRecordMapper.insert2DataAlignUserCollectCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
status.setRollbackOnly(); // 标记事务为回滚
|
||||||
|
log.error("", ex);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3. 数据库写入成功后,再添加布隆过滤器中
|
||||||
|
// 4. 数据库写入成功后,再添加布隆过滤器中
|
||||||
|
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||||
|
redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,108 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.consumer;
|
||||||
|
|
||||||
|
import com.hanserwei.framework.common.utils.JsonUtils;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.MQConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.RedisKeyConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper;
|
||||||
|
import com.hanserwei.hannote.data.align.model.vo.LikeUnlikeNoteMqDTO;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||||
|
import org.springframework.data.redis.core.script.RedisScript;
|
||||||
|
import org.springframework.scripting.support.ResourceScriptSource;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(
|
||||||
|
consumerGroup = "han_note_group_data_align_" + MQConstants.TOPIC_COUNT_NOTE_LIKE,
|
||||||
|
topic = MQConstants.TOPIC_COUNT_NOTE_LIKE
|
||||||
|
)
|
||||||
|
public class TodayNoteLikeIncrementData2DBConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
@Resource
|
||||||
|
private TransactionTemplate transactionTemplate;
|
||||||
|
@Resource
|
||||||
|
private InsertRecordMapper insertRecordMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表总分片数
|
||||||
|
*/
|
||||||
|
@Value("${table.shards}")
|
||||||
|
private int tableShards;
|
||||||
|
@Override
|
||||||
|
public void onMessage(String body) {
|
||||||
|
log.info("## TodayNoteLikeIncrementData2DBConsumer 消费到了 MQ: {}", body);
|
||||||
|
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||||
|
// Json字符串转DTO
|
||||||
|
LikeUnlikeNoteMqDTO noteLikeCountMqDTO = JsonUtils.parseObject(body, LikeUnlikeNoteMqDTO.class);
|
||||||
|
if (Objects.isNull(noteLikeCountMqDTO)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("## TodayNoteLikeIncrementData2DBConsumer 笔记点赞数据:{}", JsonUtils.toJsonString(noteLikeCountMqDTO));
|
||||||
|
// 获取被点赞或者取消点赞的笔记ID
|
||||||
|
Long noteId = noteLikeCountMqDTO.getNoteId();
|
||||||
|
// 获取点赞或取消点赞的笔记的创建者ID
|
||||||
|
Long noteCreatorId = noteLikeCountMqDTO.getNoteCreatorId();
|
||||||
|
|
||||||
|
// 今日日期
|
||||||
|
String date = LocalDate.now()
|
||||||
|
.format(DateTimeFormatter.ofPattern("yyyyMMdd")); // 转字符串
|
||||||
|
|
||||||
|
String bloomKey = RedisKeyConstants.buildBloomUserNoteLikeListKey(date);
|
||||||
|
|
||||||
|
// 1. 布隆过滤器判断该日增量数据是否已经记录
|
||||||
|
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||||
|
// Lua 脚本路径
|
||||||
|
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_today_note_like_check.lua")));
|
||||||
|
// 返回值类型
|
||||||
|
script.setResultType(Long.class);
|
||||||
|
|
||||||
|
// 执行 Lua 脚本,拿到返回结果
|
||||||
|
Long result = redisTemplate.execute(script, Collections.singletonList(bloomKey), noteId);
|
||||||
|
log.info("布隆过滤器判断结果:{}", result);
|
||||||
|
|
||||||
|
// 若布隆过滤器判断不存在(绝对正确)
|
||||||
|
if (Objects.equals(result, 0L)) {
|
||||||
|
// 2. 若无,才会落库,减轻数据库压力
|
||||||
|
// 根据分片总数,取模,分别获取对应的分片序号
|
||||||
|
long userIdHashKey = noteCreatorId % tableShards;
|
||||||
|
long noteIdHashKey = noteId % tableShards;
|
||||||
|
log.info("根据分片总数,取模,分别获取对应的分片序号user:{},note:{}", userIdHashKey, noteIdHashKey);
|
||||||
|
|
||||||
|
// 编程式事务,保证多语句的原子性
|
||||||
|
transactionTemplate.execute(status -> {
|
||||||
|
try {
|
||||||
|
// 将日增量变更数据,分别写入两张表
|
||||||
|
// - t_data_align_note_like_count_temp_日期_分片序号
|
||||||
|
// - t_data_align_user_like_count_temp_日期_分片序号
|
||||||
|
insertRecordMapper.insert2DataAlignNoteLikeCountTempTable(TableConstants.buildTableNameSuffix(date, noteIdHashKey), noteId);
|
||||||
|
insertRecordMapper.insert2DataAlignUserLikeCountTempTable(TableConstants.buildTableNameSuffix(date, userIdHashKey), noteCreatorId);
|
||||||
|
return true;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
status.setRollbackOnly();
|
||||||
|
log.error("## TodayNoteLikeIncrementData2DBConsumer 落库失败,回滚事务", ex);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
// 3. 数据库写入成功后,再添加布隆过滤器中
|
||||||
|
// 4. 数据库写入成功后,再添加布隆过滤器中
|
||||||
|
RedisScript<Long> bloomAddScript = RedisScript.of("return redis.call('BF.ADD', KEYS[1], ARGV[1])", Long.class);
|
||||||
|
redisTemplate.execute(bloomAddScript, Collections.singletonList(bloomKey), noteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,60 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.domain.mapper;
|
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 自动创建表
|
||||||
|
*/
|
||||||
|
@Mapper
|
||||||
|
public interface CreateTableMapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:关注数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:粉丝数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignFansCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:笔记收藏数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:用户被收藏数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:用户被点赞数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:笔记点赞数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建日增量表:笔记发布数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void createDataAlignNotePublishCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
}
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.domain.mapper;
|
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除表
|
||||||
|
*/
|
||||||
|
public interface DeleteTableMapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:关注数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignFollowingCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:粉丝数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignFansCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:笔记收藏数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:用户被收藏数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:用户被点赞数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:笔记点赞数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除日增量表:笔记发布数计数变更
|
||||||
|
*
|
||||||
|
* @param tableNameSuffix 表名后缀
|
||||||
|
*/
|
||||||
|
void deleteDataAlignNotePublishCountTempTable(@Param("tableNameSuffix") String tableNameSuffix);
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.domain.mapper;
|
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加记录
|
||||||
|
*/
|
||||||
|
public interface InsertRecordMapper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记点赞数:计数变更
|
||||||
|
*/
|
||||||
|
void insert2DataAlignNoteLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("noteId") Long noteId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户获得的点赞数:计数变更
|
||||||
|
*/
|
||||||
|
void insert2DataAlignUserLikeCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记收藏数:计数变更
|
||||||
|
*/
|
||||||
|
void insert2DataAlignNoteCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("noteId") Long noteId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户获得的收藏数:计数变更
|
||||||
|
*/
|
||||||
|
void insert2DataAlignUserCollectCountTempTable(@Param("tableNameSuffix") String tableNameSuffix, @Param("userId") Long userId);
|
||||||
|
}
|
||||||
@@ -1,20 +1,58 @@
|
|||||||
package com.hanserwei.hannote.data.align.job;
|
package com.hanserwei.hannote.data.align.job;
|
||||||
|
|
||||||
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.CreateTableMapper;
|
||||||
import com.xxl.job.core.context.XxlJobHelper;
|
import com.xxl.job.core.context.XxlJobHelper;
|
||||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
|
@RefreshScope
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public class CreateTableXxlJob {
|
public class CreateTableXxlJob {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CreateTableMapper createTableMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表总分片数
|
||||||
|
*/
|
||||||
|
@Value("${table.shards}")
|
||||||
|
private int tableShards;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1、简单任务示例(Bean模式)
|
* 1、简单任务示例(Bean模式)
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unused")
|
||||||
@XxlJob("createTableJobHandler")
|
@XxlJob("createTableJobHandler")
|
||||||
public void createTableJobHandler() throws Exception {
|
public void createTableJobHandler() {
|
||||||
XxlJobHelper.log("## 开始初始化明日增量数据表...");
|
XxlJobHelper.log("## 开始初始化明日增量数据表...");
|
||||||
|
String date = LocalDate.now().plusDays(1) // 明日的日期
|
||||||
|
.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||||
|
XxlJobHelper.log("## 开始创建日增量数据表,日期: {}...", date);
|
||||||
|
if (tableShards > 0) {
|
||||||
|
for (int hashKey = 0; hashKey < tableShards; hashKey++) {
|
||||||
|
// 表名后缀
|
||||||
|
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, hashKey);
|
||||||
|
|
||||||
// TODO
|
// 创建表
|
||||||
|
// 创建表
|
||||||
|
createTableMapper.createDataAlignFollowingCountTempTable(tableNameSuffix);
|
||||||
|
createTableMapper.createDataAlignFansCountTempTable(tableNameSuffix);
|
||||||
|
createTableMapper.createDataAlignNoteCollectCountTempTable(tableNameSuffix);
|
||||||
|
createTableMapper.createDataAlignUserCollectCountTempTable(tableNameSuffix);
|
||||||
|
createTableMapper.createDataAlignUserLikeCountTempTable(tableNameSuffix);
|
||||||
|
createTableMapper.createDataAlignNoteLikeCountTempTable(tableNameSuffix);
|
||||||
|
createTableMapper.createDataAlignNotePublishCountTempTable(tableNameSuffix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
XxlJobHelper.log("## 创建日增量数据表成功,表名后缀: {}...", date);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,70 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.job;
|
||||||
|
|
||||||
|
import com.hanserwei.hannote.data.align.constant.TableConstants;
|
||||||
|
import com.hanserwei.hannote.data.align.domain.mapper.DeleteTableMapper;
|
||||||
|
import com.xxl.job.core.context.XxlJobHelper;
|
||||||
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除最近一个月的日增量临时表
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class DeleteTableXxlJob {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表总分片数
|
||||||
|
*/
|
||||||
|
@Value("${table.shards}")
|
||||||
|
private int tableShards;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private DeleteTableMapper deleteTableMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 1、简单任务示例(Bean模式)
|
||||||
|
*/
|
||||||
|
@XxlJob("deleteTableJobHandler")
|
||||||
|
public void deleteTableJobHandler() {
|
||||||
|
XxlJobHelper.log("## 开始删除最近一个月的日增量临时表");
|
||||||
|
// 今日
|
||||||
|
LocalDate today = LocalDate.now();
|
||||||
|
|
||||||
|
// 日期格式
|
||||||
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||||
|
|
||||||
|
LocalDate startDate = today;
|
||||||
|
// 从昨天开始往前推一个月
|
||||||
|
LocalDate endDate = today.minusMonths(1);
|
||||||
|
|
||||||
|
// 循环最近一个月的日期,不包括今天
|
||||||
|
while (startDate.isAfter(endDate)) {
|
||||||
|
// 往前推一天
|
||||||
|
startDate = startDate.minusDays(1);
|
||||||
|
// 日期字符串
|
||||||
|
String date = startDate.format(formatter);
|
||||||
|
|
||||||
|
for (int hashKey = 0; hashKey < tableShards; hashKey++) {
|
||||||
|
// 表名后缀
|
||||||
|
String tableNameSuffix = TableConstants.buildTableNameSuffix(date, hashKey);
|
||||||
|
XxlJobHelper.log("删除表后缀: {}", tableNameSuffix);
|
||||||
|
|
||||||
|
// 删除表
|
||||||
|
deleteTableMapper.deleteDataAlignFollowingCountTempTable(tableNameSuffix);
|
||||||
|
deleteTableMapper.deleteDataAlignFansCountTempTable(tableNameSuffix);
|
||||||
|
deleteTableMapper.deleteDataAlignNoteCollectCountTempTable(tableNameSuffix);
|
||||||
|
deleteTableMapper.deleteDataAlignUserCollectCountTempTable(tableNameSuffix);
|
||||||
|
deleteTableMapper.deleteDataAlignUserLikeCountTempTable(tableNameSuffix);
|
||||||
|
deleteTableMapper.deleteDataAlignNoteLikeCountTempTable(tableNameSuffix);
|
||||||
|
deleteTableMapper.deleteDataAlignNotePublishCountTempTable(tableNameSuffix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
XxlJobHelper.log("## 结束删除最近一个月的日增量临时表");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.model.vo;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class CollectUnCollectNoteMqDTO {
|
||||||
|
|
||||||
|
private Long userId;
|
||||||
|
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 0: 取消收藏, 1:收藏
|
||||||
|
*/
|
||||||
|
private Integer type;
|
||||||
|
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记发布者 ID
|
||||||
|
*/
|
||||||
|
private Long noteCreatorId;
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package com.hanserwei.hannote.data.align.model.vo;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class LikeUnlikeNoteMqDTO {
|
||||||
|
|
||||||
|
private Long userId;
|
||||||
|
|
||||||
|
private Long noteId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 0: 取消点赞, 1:点赞
|
||||||
|
*/
|
||||||
|
private Integer type;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 笔记发布者 ID
|
||||||
|
*/
|
||||||
|
private Long noteCreatorId;
|
||||||
|
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
-- LUA 脚本:日增量笔记收藏、取消收藏变更数据布隆过滤器
|
||||||
|
|
||||||
|
local key = KEYS[1] -- 操作的 Redis Key
|
||||||
|
local noteIdAndNoteCreatorId = ARGV[1] -- Redis Value
|
||||||
|
|
||||||
|
-- 使用 EXISTS 命令检查布隆过滤器是否存在
|
||||||
|
local exists = redis.call('EXISTS', key)
|
||||||
|
if exists == 0 then
|
||||||
|
-- 创建布隆过滤器
|
||||||
|
redis.call('BF.ADD', key, '')
|
||||||
|
-- 设置过期时间,一天后过期
|
||||||
|
redis.call("EXPIRE", key, 20 * 60 * 60)
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 校验该变更数据是否已经存在(1 表示已存在,0 表示不存在)
|
||||||
|
return redis.call('BF.EXISTS', key, noteIdAndNoteCreatorId)
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
-- LUA 脚本:自增量笔记点赞、取消点赞变更数据布隆过滤器
|
||||||
|
|
||||||
|
local key = KEYS[1] -- 操作的 Redis Key
|
||||||
|
local noteIdAndNoteCreatorId = ARGV[1] -- Redis Value
|
||||||
|
|
||||||
|
-- 使用 EXISTS 命令检查布隆过滤器是否存在
|
||||||
|
local exists = redis.call('EXISTS', key)
|
||||||
|
if exists == 0 then
|
||||||
|
-- 创建布隆过滤器
|
||||||
|
redis.call('BF.ADD', key, '')
|
||||||
|
-- 设置过期时间,一天后过期
|
||||||
|
redis.call("EXPIRE", key, 20 * 60 * 60)
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 校验该变更数据是否已经存在(1 表示已存在,0 表示不存在)
|
||||||
|
return redis.call('BF.EXISTS', key, noteIdAndNoteCreatorId)
|
||||||
@@ -0,0 +1,86 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.CreateTableMapper">
|
||||||
|
<insert id="createDataAlignFollowingCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_following_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`user_id` bigint unsigned NOT NULL COMMENT '用户ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_user_id` (`user_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:关注数';
|
||||||
|
</insert>
|
||||||
|
<insert id="createDataAlignFansCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_fans_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`user_id` bigint unsigned NOT NULL COMMENT '用户ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_user_id` (`user_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:粉丝数';
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="createDataAlignNoteCollectCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_note_collect_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`note_id` bigint unsigned NOT NULL COMMENT '笔记ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_note_id` (`note_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:笔记获得收藏数';
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="createDataAlignUserCollectCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_user_collect_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`user_id` bigint unsigned NOT NULL COMMENT '用户ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_user_id` (`user_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:用户获得收藏数';
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="createDataAlignUserLikeCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_user_like_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`user_id` bigint unsigned NOT NULL COMMENT '用户ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_user_id` (`user_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:用户获得点赞数';
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="createDataAlignNoteLikeCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_note_like_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`note_id` bigint unsigned NOT NULL COMMENT '笔记ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_note_id` (`note_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:笔记获得点赞数';
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="createDataAlignNotePublishCountTempTable" parameterType="map">
|
||||||
|
CREATE TABLE IF NOT EXISTS `t_data_align_note_publish_count_temp_${tableNameSuffix}`
|
||||||
|
(
|
||||||
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
|
||||||
|
`note_id` bigint unsigned NOT NULL COMMENT '笔记ID',
|
||||||
|
PRIMARY KEY (`id`) USING BTREE,
|
||||||
|
UNIQUE KEY `uk_note_id` (`note_id`)
|
||||||
|
) ENGINE = InnoDB
|
||||||
|
DEFAULT CHARSET = utf8mb4
|
||||||
|
COLLATE = utf8mb4_unicode_ci COMMENT ='数据对齐日增量表:用户发布笔记数';
|
||||||
|
</insert>
|
||||||
|
</mapper>
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.DeleteTableMapper">
|
||||||
|
<delete id="deleteDataAlignFollowingCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_following_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
|
||||||
|
<delete id="deleteDataAlignFansCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_fans_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
|
||||||
|
<delete id="deleteDataAlignNoteCollectCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_note_collect_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
|
||||||
|
<delete id="deleteDataAlignUserCollectCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_user_collect_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
|
||||||
|
<delete id="deleteDataAlignUserLikeCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_user_like_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
|
||||||
|
<delete id="deleteDataAlignNoteLikeCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_note_like_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
|
||||||
|
<delete id="deleteDataAlignNotePublishCountTempTable" parameterType="map">
|
||||||
|
DROP TABLE IF EXISTS `t_data_align_note_publish_count_temp_${tableNameSuffix}`;
|
||||||
|
</delete>
|
||||||
|
</mapper>
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||||
|
<mapper namespace="com.hanserwei.hannote.data.align.domain.mapper.InsertRecordMapper">
|
||||||
|
<insert id="insert2DataAlignNoteLikeCountTempTable" parameterType="map">
|
||||||
|
insert into `t_data_align_note_like_count_temp_${tableNameSuffix}` (note_id)
|
||||||
|
values (#{noteId})
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insert2DataAlignUserLikeCountTempTable" parameterType="map">
|
||||||
|
insert into `t_data_align_user_like_count_temp_${tableNameSuffix}` (user_id)
|
||||||
|
values (#{userId})
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insert2DataAlignNoteCollectCountTempTable" parameterType="map">
|
||||||
|
insert into `t_data_align_note_collect_count_temp_${tableNameSuffix}` (note_id)
|
||||||
|
values (#{noteId})
|
||||||
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insert2DataAlignUserCollectCountTempTable" parameterType="map">
|
||||||
|
insert into `t_data_align_user_collect_count_temp_${tableNameSuffix}` (user_id)
|
||||||
|
values (#{userId})
|
||||||
|
</insert>
|
||||||
|
</mapper>
|
||||||
@@ -99,6 +99,7 @@ public class LikeUnlikeNoteConsumer implements RocketMQListener<Message> {
|
|||||||
|
|
||||||
// 执行更新
|
// 执行更新
|
||||||
boolean update = noteLikeDOService.update(updateEntity, wrapper);
|
boolean update = noteLikeDOService.update(updateEntity, wrapper);
|
||||||
|
log.info("==> 【取消点赞笔记】更新数据库成功,update: {}", update);
|
||||||
|
|
||||||
if (!update) {
|
if (!update) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -465,7 +465,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
String contentUuid = noteDO1.getContentUuid();
|
String contentUuid = noteDO1.getContentUuid();
|
||||||
|
|
||||||
// 笔记内容是否更新成功
|
// 笔记内容是否更新成功
|
||||||
boolean isUpdateContentSuccess = false;
|
boolean isUpdateContentSuccess;
|
||||||
if (StringUtils.isBlank(content)) {
|
if (StringUtils.isBlank(content)) {
|
||||||
// 若笔记内容为空,则删除 K-V 存储
|
// 若笔记内容为空,则删除 K-V 存储
|
||||||
isUpdateContentSuccess = keyValueRpcService.deleteNoteContent(contentUuid);
|
isUpdateContentSuccess = keyValueRpcService.deleteNoteContent(contentUuid);
|
||||||
@@ -792,16 +792,15 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
|
|
||||||
NoteUnlikeLuaResultEnum noteUnlikeLuaResultEnum = NoteUnlikeLuaResultEnum.valueOf(result);
|
NoteUnlikeLuaResultEnum noteUnlikeLuaResultEnum = NoteUnlikeLuaResultEnum.valueOf(result);
|
||||||
log.info("==> 【笔记取消点赞】Lua 脚本返回结果: {}", noteUnlikeLuaResultEnum);
|
log.info("==> 【笔记取消点赞】Lua 脚本返回结果: {}", noteUnlikeLuaResultEnum);
|
||||||
assert noteUnlikeLuaResultEnum != null;
|
switch (Objects.requireNonNull(noteUnlikeLuaResultEnum)) {
|
||||||
switch (noteUnlikeLuaResultEnum) {
|
|
||||||
// 布隆过滤器不存在
|
// 布隆过滤器不存在
|
||||||
case NOT_EXIST -> {
|
case NOT_EXIST -> {//笔记不存在
|
||||||
//笔记不存在
|
|
||||||
//异步初始化布隆过滤器
|
//异步初始化布隆过滤器
|
||||||
threadPoolTaskExecutor.submit(() -> {
|
threadPoolTaskExecutor.submit(() -> {
|
||||||
// 保底1天+随机秒数
|
// 保底1天+随机秒数
|
||||||
long expireSeconds = 60 * 60 * 24 + RandomUtil.randomInt(60 * 60 * 24);
|
long expireSeconds = 60 * 60 * 24 + RandomUtil.randomInt(60 * 60 * 24);
|
||||||
batchAddNoteLike2BloomAndExpire(userId, expireSeconds, bloomUserNoteLikeListKey);
|
batchAddNoteLike2BloomAndExpire(userId, expireSeconds, bloomUserNoteLikeListKey);
|
||||||
|
});
|
||||||
// 从数据库中校验笔记是否被点赞
|
// 从数据库中校验笔记是否被点赞
|
||||||
long count = noteLikeDOService.count(new LambdaQueryWrapper<>(NoteLikeDO.class)
|
long count = noteLikeDOService.count(new LambdaQueryWrapper<>(NoteLikeDO.class)
|
||||||
.eq(NoteLikeDO::getUserId, userId)
|
.eq(NoteLikeDO::getUserId, userId)
|
||||||
@@ -811,7 +810,6 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
log.info("==> 【笔记取消点赞】用户未点赞该笔记");
|
log.info("==> 【笔记取消点赞】用户未点赞该笔记");
|
||||||
throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED);
|
throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
// 布隆过滤器校验目标笔记未被点赞(判断绝对正确)
|
// 布隆过滤器校验目标笔记未被点赞(判断绝对正确)
|
||||||
case NOTE_NOT_LIKED -> throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED);
|
case NOTE_NOT_LIKED -> throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED);
|
||||||
@@ -821,7 +819,14 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
// 用户点赞列表ZsetKey
|
// 用户点赞列表ZsetKey
|
||||||
String userNoteLikeZSetKey = RedisKeyConstants.buildUserNoteLikeZSetKey(userId);
|
String userNoteLikeZSetKey = RedisKeyConstants.buildUserNoteLikeZSetKey(userId);
|
||||||
|
|
||||||
redisTemplate.opsForZSet().remove(userNoteLikeZSetKey, noteId);
|
// TODO: 后续考虑换掉布隆过滤器。
|
||||||
|
|
||||||
|
Long removed = redisTemplate.opsForZSet().remove(userNoteLikeZSetKey, noteId);
|
||||||
|
|
||||||
|
if (Objects.nonNull(removed) && removed == 0) {
|
||||||
|
log.info("==> 【笔记取消点赞】用户未点赞该笔记");
|
||||||
|
throw new ApiException(ResponseCodeEnum.NOTE_NOT_LIKED);
|
||||||
|
}
|
||||||
|
|
||||||
//4. 发送 MQ, 数据更新落库
|
//4. 发送 MQ, 数据更新落库
|
||||||
// 构建MQ消息体
|
// 构建MQ消息体
|
||||||
@@ -899,9 +904,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
// 若目标笔记已经收藏
|
// 若目标笔记已经收藏
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
// 异步初始化布隆过滤器
|
// 异步初始化布隆过滤器
|
||||||
threadPoolTaskExecutor.submit(() -> {
|
threadPoolTaskExecutor.submit(() -> batchAddNoteCollect2BloomAndExpire(userId, expireSeconds, bloomUserNoteCollectListKey));
|
||||||
batchAddNoteCollect2BloomAndExpire(userId, expireSeconds, bloomUserNoteCollectListKey);
|
|
||||||
});
|
|
||||||
throw new ApiException(ResponseCodeEnum.NOTE_ALREADY_COLLECTED);
|
throw new ApiException(ResponseCodeEnum.NOTE_ALREADY_COLLECTED);
|
||||||
}
|
}
|
||||||
// 若目标笔记未被收藏,查询当前用户是否有收藏其他笔记,有则同步初始化布隆过滤器
|
// 若目标笔记未被收藏,查询当前用户是否有收藏其他笔记,有则同步初始化布隆过滤器
|
||||||
@@ -932,7 +935,6 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. 更新用户 ZSET 收藏列表
|
|
||||||
// 3. 更新用户 ZSET 收藏列表
|
// 3. 更新用户 ZSET 收藏列表
|
||||||
LocalDateTime now = LocalDateTime.now();
|
LocalDateTime now = LocalDateTime.now();
|
||||||
// Lua 脚本路径
|
// Lua 脚本路径
|
||||||
@@ -1033,7 +1035,7 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
|
|
||||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||||
// Lua 脚本路径
|
// Lua 脚本路径
|
||||||
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_note_uncollect_check.lua")));
|
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/lua/bloom_note_uncollected_check.lua")));
|
||||||
// 返回值类型
|
// 返回值类型
|
||||||
script.setResultType(Long.class);
|
script.setResultType(Long.class);
|
||||||
|
|
||||||
@@ -1069,7 +1071,10 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
// 用户收藏列表 ZSet Key
|
// 用户收藏列表 ZSet Key
|
||||||
String userNoteCollectZSetKey = RedisKeyConstants.buildUserNoteCollectZSetKey(userId);
|
String userNoteCollectZSetKey = RedisKeyConstants.buildUserNoteCollectZSetKey(userId);
|
||||||
|
|
||||||
redisTemplate.opsForZSet().remove(userNoteCollectZSetKey, noteId);
|
Long removed = redisTemplate.opsForZSet().remove(userNoteCollectZSetKey, noteId);
|
||||||
|
if (removed != null && removed == 0) {
|
||||||
|
throw new ApiException(ResponseCodeEnum.NOTE_NOT_COLLECTED);
|
||||||
|
}
|
||||||
|
|
||||||
// 4. 发送 MQ, 数据更新落库
|
// 4. 发送 MQ, 数据更新落库
|
||||||
// 构建消息体 DTO
|
// 构建消息体 DTO
|
||||||
@@ -1271,11 +1276,14 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
*/
|
*/
|
||||||
private void batchAddNoteLike2BloomAndExpire(Long userId, long expireSeconds, String bloomUserNoteLikeListKey) {
|
private void batchAddNoteLike2BloomAndExpire(Long userId, long expireSeconds, String bloomUserNoteLikeListKey) {
|
||||||
try {
|
try {
|
||||||
|
log.info("## 异步初始化【笔记点赞】布隆过滤器开始: userId={}", userId);
|
||||||
// 异步全量同步一下,并设置过期时间
|
// 异步全量同步一下,并设置过期时间
|
||||||
List<NoteLikeDO> noteLikeDOS = noteLikeDOService.list(new LambdaQueryWrapper<>(NoteLikeDO.class)
|
List<NoteLikeDO> noteLikeDOS = noteLikeDOService.list(new LambdaQueryWrapper<>(NoteLikeDO.class)
|
||||||
.select(NoteLikeDO::getNoteId)
|
.select(NoteLikeDO::getNoteId)
|
||||||
.eq(NoteLikeDO::getUserId, userId)
|
.eq(NoteLikeDO::getUserId, userId)
|
||||||
.eq(NoteLikeDO::getStatus, LikeStatusEnum.LIKE.getCode()));
|
.eq(NoteLikeDO::getStatus, LikeStatusEnum.LIKE.getCode()));
|
||||||
|
log.info("## 异步初始化【笔记点赞】布隆过滤器,用户笔记点赞数量: {},笔记ID:{}", noteLikeDOS.size(),
|
||||||
|
JsonUtils.toJsonString(noteLikeDOS.stream().map(NoteLikeDO::getNoteId).toList()));
|
||||||
|
|
||||||
if (CollUtil.isNotEmpty(noteLikeDOS)) {
|
if (CollUtil.isNotEmpty(noteLikeDOS)) {
|
||||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
|
||||||
@@ -1288,7 +1296,8 @@ public class NoteServiceImpl extends ServiceImpl<NoteDOMapper, NoteDO> implement
|
|||||||
List<Object> luaArgs = Lists.newArrayList();
|
List<Object> luaArgs = Lists.newArrayList();
|
||||||
noteLikeDOS.forEach(noteLikeDO -> luaArgs.add(noteLikeDO.getNoteId())); // 将每个点赞的笔记 ID 传入
|
noteLikeDOS.forEach(noteLikeDO -> luaArgs.add(noteLikeDO.getNoteId())); // 将每个点赞的笔记 ID 传入
|
||||||
luaArgs.add(expireSeconds); // 最后一个参数是过期时间(秒)
|
luaArgs.add(expireSeconds); // 最后一个参数是过期时间(秒)
|
||||||
redisTemplate.execute(script, Collections.singletonList(bloomUserNoteLikeListKey), luaArgs.toArray());
|
Long result = redisTemplate.execute(script, Collections.singletonList(bloomUserNoteLikeListKey), luaArgs.toArray());
|
||||||
|
log.info("## 异步初始化【笔记点赞】布隆过滤器结果: {}", result);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("## 异步初始化布隆过滤器异常: ", e);
|
log.error("## 异步初始化布隆过滤器异常: ", e);
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
@SuppressWarnings("UnstableApiUsage")
|
||||||
@Component
|
@Component
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
consumerGroup = "han_note_group_" + MQConstants.TOPIC_FOLLOW_OR_UNFOLLOW, //han_note_group_FollowUnfollowTopic
|
consumerGroup = "han_note_group_" + MQConstants.TOPIC_FOLLOW_OR_UNFOLLOW, //han_note_group_FollowUnfollowTopic
|
||||||
|
|||||||
Reference in New Issue
Block a user