Compare commits

..

3 Commits

Author SHA1 Message Date
218f4c6974 refactor(search):优化 Canal 数据同步逻辑
- 移除未使用的 IndexResponse 导入
- 简化 Elasticsearch 索引写入操作,去除不必要的响应处理- 保持异常处理逻辑不变,确保错误日志记录完整性
2025-11-03 14:53:09 +08:00
268a009c9b feat(search): 实现用户索引同步与删除功能
- 新增处理用户事件的逻辑,包括插入和更新操作
- 实现用户文档的删除方法- 添加批量同步用户索引和笔记索引的功能
- 扩展 selectEsNoteIndexData 方法支持按用户 ID 查询
- 新增 selectEsUserIndexData 方法用于查询用户索引数据
- 更新 XML 映射文件以支持新的查询条件和字段
2025-11-03 14:52:51 +08:00
678c8ab8eb feat(search): 实现 Canal 数据同步到 Elasticsearch 功能
- 添加 Elasticsearch 客户端依赖及配置
- 实现 Canal 数据监听与解析逻辑
- 新增笔记索引同步与删除处理
- 添加 MyBatis Mapper 扫描与数据源配置
- 定义笔记状态与可见性枚举类
- 配置 MyBatis XML 映射文件路径
2025-11-03 14:22:55 +08:00
8 changed files with 386 additions and 45 deletions

View File

@@ -20,11 +20,12 @@
<option name="lombokDataAnnotation" value="true" />
<option name="lombokNoArgsConstructor" value="true" />
<option name="mapperAnnotaion" value="true" />
<option name="mapperFilesFolder" value="$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml" />
<option name="mapperFilesFolder" value="$PROJECT_DIR$/han-note-search/src/main/resources/mapperxml" />
<option name="mapperFilesFolderList">
<list>
<option value="$PROJECT_DIR$/han-note-auth/src/main/resources/mapperxml" />
<option value="$PROJECT_DIR$/han-note-data-align/src/main/resources/mapperxml" />
<option value="$PROJECT_DIR$/han-note-search/src/main/resources/mapperxml" />
</list>
</option>
<option name="moduleNameToPackageAndPathMap">

View File

@@ -76,7 +76,19 @@
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- Druid 数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-3-starter</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -1,11 +1,13 @@
package com.hanserwei.hannote.search;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@MapperScan("com.hanserwei.hannote.search.domain.mapper")
public class HannoteSearchApplication {
public static void main(String[] args) {
SpringApplication.run(HannoteSearchApplication.class, args);

View File

@@ -1,14 +1,28 @@
package com.hanserwei.hannote.search.canal;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Maps;
import com.hanserwei.framework.common.enums.StatusEnum;
import com.hanserwei.hannote.search.domain.mapper.SelectMapper;
import com.hanserwei.hannote.search.enums.NoteStatusEnum;
import com.hanserwei.hannote.search.enums.NoteVisibleEnum;
import com.hanserwei.hannote.search.index.NoteIndex;
import com.hanserwei.hannote.search.index.UserIndex;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Component
@@ -19,17 +33,10 @@ public class CanalSchedule implements Runnable {
private CanalProperties canalProperties;
@Resource
private CanalConnector canalConnector;
/**
* 打印字段信息
*
* @param columns 字段信息
*/
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
@Resource
private SelectMapper selectMapper;
@Resource
private ElasticsearchClient client;
@Override
@Scheduled(fixedDelay = 100) // 每隔 100ms 被执行一次
@@ -50,11 +57,11 @@ public class CanalSchedule implements Runnable {
// 拉取数据为空,休眠 1s, 防止频繁拉取
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("消费 Canal 批次数据异常", e);
log.error("休眠异常", e);
}
} else {
// 如果当前批次有数据,打印这批次中的数据条目
printEntry(message.getEntries());
// 如果当前批次有数据,处理这批次数据
processEntry(message.getEntries());
}
// 对当前批次的消息进行 ack 确认,表示该批次的数据已经被成功消费
@@ -67,46 +74,256 @@ public class CanalSchedule implements Runnable {
}
/**
* 打印这一批次中的数据条目(和官方示例代码一致,后续小节中会自定义这块)
*
* 处理这一批次数据
* @param entrys 批次数据
*/
private void printEntry(List<CanalEntry.Entry> entrys) {
private void processEntry(List<CanalEntry.Entry> entrys) throws Exception {
// 循环处理批次数据
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
// 只处理 ROWDATA 行数据类型的 Entry忽略事务等其他类型
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 获取事件类型INSERT、UPDATE、DELETE 等等)
CanalEntry.EventType eventType = entry.getHeader().getEventType();
// 获取数据库名称
String database = entry.getHeader().getSchemaName();
// 获取表名称
String table = entry.getHeader().getTableName();
// 解析出 RowChange 对象,包含 RowData 和事件相关信息
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 遍历所有行数据RowData
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 获取行中所有列的最新值AfterColumns
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
// 将列数据解析为 Map方便后续处理
Map<String, Object> columnMap = parseColumns2Map(columns);
// 自定义处理
log.info("EventType: {}, Database: {}, Table: {}, Columns: {}", eventType, database, table, columnMap);
// 处理事件
processEvent(columnMap, table, eventType);
}
}
}
}
CanalEntry.RowChange rowChage;
private void processEvent(Map<String, Object> columnMap, String table, CanalEntry.EventType eventType) {
switch (table) {
case "t_note" -> handleNoteEvent(columnMap, eventType); // 笔记表
case "t_user" -> handleUserEvent(columnMap, eventType); // 用户表
default -> log.warn("Table: {} not support", table);
}
}
/**
* 处理用户表事件
*
* @param columnMap 列数据
* @param eventType 事件类型
*/
private void handleUserEvent(Map<String, Object> columnMap, CanalEntry.EventType eventType) {
// 获取用户 ID
Long userId = Long.parseLong(columnMap.get("id").toString());
// 不同的事件,处理逻辑不同
switch (eventType) {
case INSERT -> syncUserIndex(userId); // 记录新增事件
case UPDATE -> { // 记录更新事件
// 用户变更后的状态
Integer status = Integer.parseInt(columnMap.get("status").toString());
// 逻辑删除
Integer isDeleted = Integer.parseInt(columnMap.get("is_deleted").toString());
if (Objects.equals(status, StatusEnum.ENABLE.getValue())
&& Objects.equals(isDeleted, 0)) { // 用户状态为已启用,并且未被逻辑删除
// 更新用户索引、笔记索引
syncNotesIndexAndUserIndex(userId);
} else if (Objects.equals(status, StatusEnum.DISABLED.getValue()) // 用户状态为禁用
|| Objects.equals(isDeleted, 1)) { // 被逻辑删除
// 删除用户文档
deleteUserDocument(String.valueOf(userId));
}
}
default -> log.warn("Unhandled event type for t_user: {}", eventType);
}
}
/**
* 删除用户文档
*
* @param documentId 文档 ID
*/
private void deleteUserDocument(String documentId) {
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry,
e);
client.delete(d -> d.index(UserIndex.NAME).id(documentId));
} catch (IOException e) {
log.error("删除用户文档异常", e);
}
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
/**
* 同步用户索引、笔记索引(可能是多条)
*
* @param userId 用户 ID
*/
private void syncNotesIndexAndUserIndex(Long userId) {
BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder();
// 1. 用户索引
List<Map<String, Object>> userResult = selectMapper.selectEsUserIndexData(userId);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : userResult) {
// 构建 BulkOperation
BulkOperation op = BulkOperation.of(b -> b
.index(i -> i
.index(UserIndex.NAME)
.id(String.valueOf(recordMap.get(UserIndex.FIELD_USER_ID)))
.document(recordMap)
)
);
// 通过 operations() 方法添加到 Builder
bulkRequestBuilder.operations(op);
}
List<Map<String, Object>> noteResult = selectMapper.selectEsNoteIndexData(null, userId);
for (Map<String, Object> recordMap : noteResult) {
BulkOperation op = BulkOperation.of(b -> b
.index(i -> i
.index(NoteIndex.NAME)
.id(String.valueOf(recordMap.get(NoteIndex.FIELD_NOTE_ID)))
.document(recordMap)
)
);
bulkRequestBuilder.operations(op);
}
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
// 构建 BulkRequest
BulkRequest bulkRequest = bulkRequestBuilder.build();
try {
client.bulk(bulkRequest);
} catch (IOException e) {
log.error("执行批量请求出错:", e);
}
}
/**
* 同步用户索引数据
*
* @param userId 用户 ID
*/
private void syncUserIndex(Long userId) {
// 1. 同步用户索引
List<Map<String, Object>> userResult = selectMapper.selectEsUserIndexData(userId);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : userResult) {
// 创建索引请求对象,指定索引名称
IndexRequest<Object> request = IndexRequest.of(indexRequest -> indexRequest
// 指定索引名称
.index(UserIndex.NAME)
// 设置文档的 ID使用记录中的主键 “id” 字段值
.id(String.valueOf(recordMap.get(UserIndex.FIELD_USER_ID)))
// 设置文档的内容,使用查询结果的记录数据
.document(recordMap));
// 将数据写入 Elasticsearch 索引
try {
client.index(request);
} catch (IOException e) {
log.error("写入索引异常", e);
}
}
}
/**
* 处理笔记表事件
*
* @param columnMap 列数据
* @param eventType 事件类型
*/
private void handleNoteEvent(Map<String, Object> columnMap, CanalEntry.EventType eventType) {
// 获取笔记 ID
Long noteId = Long.parseLong(columnMap.get("id").toString());
// 不同的事件,处理逻辑不同
switch (eventType) {
case INSERT -> syncNoteIndex(noteId); // 记录新增事件
case UPDATE -> {
// 记录更新事件
// 笔记变更后的状态
Integer status = Integer.parseInt(columnMap.get("status").toString());
// 笔记可见范围
Integer visible = Integer.parseInt(columnMap.get("visible").toString());
if (Objects.equals(status, NoteStatusEnum.NORMAL.getCode())
&& Objects.equals(visible, NoteVisibleEnum.PUBLIC.getCode())) { // 正常展示,并且可见性为公开
// 对索引进行覆盖更新
syncNoteIndex(noteId);
} else if (Objects.equals(visible, NoteVisibleEnum.PRIVATE.getCode()) // 仅对自己可见
|| Objects.equals(status, NoteStatusEnum.DELETED.getCode())
|| Objects.equals(status, NoteStatusEnum.DOWNED.getCode())) { // 被逻辑删除、被下架
// 删除笔记文档
deleteNoteDocument(String.valueOf(noteId));
}
}
default -> log.warn("Unhandled event type for t_note: {}", eventType);
}
}
/**
* 删除笔记文档
*
* @param documentId 文档ID
*/
private void deleteNoteDocument(String documentId) {
try {
client.delete(deleteRequest -> deleteRequest
.index(NoteIndex.NAME)
.id(documentId));
} catch (IOException e) {
log.error("删除笔记文档异常", e);
}
}
/**
* 同步笔记索引
*
* @param noteId 笔记ID
*/
private void syncNoteIndex(Long noteId) {
// 从数据库查询 Elasticsearch 索引数据
List<Map<String, Object>> result = selectMapper.selectEsNoteIndexData(noteId, null);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : result) {
// 创建索引请求对象,指定索引名称
IndexRequest<Object> request = IndexRequest.of(indexRequest -> indexRequest
.index(NoteIndex.NAME)
// 设置文档的 ID使用记录中的主键 “id” 字段值
.id(String.valueOf(recordMap.get(NoteIndex.FIELD_NOTE_ID)))
// 设置文档的内容,使用查询结果的记录数据
.document(recordMap));
// 将数据写入 Elasticsearch 索引
try {
client.index(request);
} catch (IOException e) {
log.error("写入 Elasticsearch 索引异常", e);
}
}
}
/**
* 将列数据解析为 Map
*
* @param columns 列数据
* @return Map
*/
private Map<String, Object> parseColumns2Map(List<CanalEntry.Column> columns) {
Map<String, Object> map = Maps.newHashMap();
columns.forEach(column -> {
if (Objects.isNull(column)) return;
map.put(column.getName(), column.getValue());
});
return map;
}
}

View File

@@ -0,0 +1,27 @@
package com.hanserwei.hannote.search.domain.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
@Mapper
public interface SelectMapper {
/**
* 查询笔记文档所需的全字段数据
* @param noteId 笔记 ID
* @return 笔记文档所需全字段数据
*/
List<Map<String, Object>> selectEsNoteIndexData(@Param("noteId") Long noteId, @Param("userId") Long userId);
/**
* 查询用户索引所需的全字段数据
*
* @param userId 用户 ID
* @return 用户索引所需全字段数据
*/
List<Map<String, Object>> selectEsUserIndexData(@Param("userId") Long userId);
}

View File

@@ -0,0 +1,18 @@
package com.hanserwei.hannote.search.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum NoteStatusEnum {
BE_EXAMINE(0), // 待审核
NORMAL(1), // 正常展示
DELETED(2), // 被删除
DOWNED(3), // 被下架
;
private final Integer code;
}

View File

@@ -0,0 +1,15 @@
package com.hanserwei.hannote.search.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum NoteVisibleEnum {
PUBLIC(0), // 公开,所有人可见
PRIVATE(1); // 仅自己可见
private final Integer code;
}

View File

@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.hanserwei.hannote.search.domain.mapper.SelectMapper">
<select id="selectEsNoteIndexData" resultType="map" parameterType="map">
select n.id,
n.title,
n.topic_name as topic,
n.type,
n.img_uris,
DATE_FORMAT(n.create_time, '%Y-%m-%d %H:%i:%s') AS create_time,
DATE_FORMAT(n.update_time, '%Y-%m-%d %H:%i:%s') AS update_time,
u.nickname,
u.avatar,
IFNULL(nc.like_total, 0) as like_total,
IFNULL(nc.collect_total, 0) as collect_total,
IFNULL(nc.comment_total, 0) as comment_total
from t_note n
left join t_user u on n.creator_id = u.id
left join t_note_count nc on n.id = nc.note_id
where n.visible = 0
and n.`status` = 1
<if test="noteId != null">
and n.id = #{noteId}
</if>
<if test="userId != null">
and u.id = #{userId}
</if>
<if test="noteId != null">
limit 1
</if>
</select>
<select id="selectEsUserIndexData" resultType="map" parameterType="map">
select u.id,
u.nickname,
u.avatar,
u.han_note_id,
IFNULL(uc.note_total, 0) as note_total,
IFNULL(uc.fans_total, 0) as fans_total
from t_user u
left join t_user_count uc on u.id = uc.user_id
where u.`status` = 0
and u.is_deleted = 0
<if test="userId != null">
and u.id = #{userId}
</if>
</select>
</mapper>