feat(search): 实现 Canal 数据同步到 Elasticsearch 功能

- 添加 Elasticsearch 客户端依赖及配置
- 实现 Canal 数据监听与解析逻辑
- 新增笔记索引同步与删除处理
- 添加 MyBatis Mapper 扫描与数据源配置
- 定义笔记状态与可见性枚举类
- 配置 MyBatis XML 映射文件路径
This commit is contained in:
2025-11-03 14:22:55 +08:00
parent 39d2eb1063
commit 678c8ab8eb
8 changed files with 247 additions and 45 deletions

View File

@@ -1,11 +1,13 @@
package com.hanserwei.hannote.search;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@MapperScan("com.hanserwei.hannote.search.domain.mapper")
public class HannoteSearchApplication {
public static void main(String[] args) {
SpringApplication.run(HannoteSearchApplication.class, args);

View File

@@ -1,14 +1,25 @@
package com.hanserwei.hannote.search.canal;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Maps;
import com.hanserwei.hannote.search.domain.mapper.SelectMapper;
import com.hanserwei.hannote.search.enums.NoteStatusEnum;
import com.hanserwei.hannote.search.enums.NoteVisibleEnum;
import com.hanserwei.hannote.search.index.NoteIndex;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Component
@@ -19,17 +30,10 @@ public class CanalSchedule implements Runnable {
private CanalProperties canalProperties;
@Resource
private CanalConnector canalConnector;
/**
* 打印字段信息
*
* @param columns 字段信息
*/
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
@Resource
private SelectMapper selectMapper;
@Resource
private ElasticsearchClient client;
@Override
@Scheduled(fixedDelay = 100) // 每隔 100ms 被执行一次
@@ -50,11 +54,11 @@ public class CanalSchedule implements Runnable {
// 拉取数据为空,休眠 1s, 防止频繁拉取
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("消费 Canal 批次数据异常", e);
log.error("休眠异常", e);
}
} else {
// 如果当前批次有数据,打印这批次中的数据条目
printEntry(message.getEntries());
// 如果当前批次有数据,处理这批次数据
processEntry(message.getEntries());
}
// 对当前批次的消息进行 ack 确认,表示该批次的数据已经被成功消费
@@ -67,46 +71,149 @@ public class CanalSchedule implements Runnable {
}
/**
* 打印这一批次中的数据条目(和官方示例代码一致,后续小节中会自定义这块)
*
* 处理这一批次数据
* @param entrys 批次数据
*/
private void printEntry(List<CanalEntry.Entry> entrys) {
private void processEntry(List<CanalEntry.Entry> entrys) throws Exception {
// 循环处理批次数据
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 只处理 ROWDATA 行数据类型的 Entry忽略事务等其他类型
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 获取事件类型INSERT、UPDATE、DELETE 等等)
CanalEntry.EventType eventType = entry.getHeader().getEventType();
// 获取数据库名称
String database = entry.getHeader().getSchemaName();
// 获取表名称
String table = entry.getHeader().getTableName();
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry,
e);
}
// 解析出 RowChange 对象,包含 RowData 和事件相关信息
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
// 遍历所有行数据RowData
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 获取行中所有列的最新值AfterColumns
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
// 将列数据解析为 Map方便后续处理
Map<String, Object> columnMap = parseColumns2Map(columns);
// 自定义处理
log.info("EventType: {}, Database: {}, Table: {}, Columns: {}", eventType, database, table, columnMap);
// 处理事件
processEvent(columnMap, table, eventType);
}
}
}
}
private void processEvent(Map<String, Object> columnMap, String table, CanalEntry.EventType eventType) {
switch (table) {
case "t_note" -> handleNoteEvent(columnMap, eventType); // 笔记表
case "t_user" -> handleUserEvent(columnMap, eventType); // 用户表
default -> log.warn("Table: {} not support", table);
}
}
/**
* 处理用户表事件
*
* @param columnMap 列数据
* @param eventType 事件类型
*/
private void handleUserEvent(Map<String, Object> columnMap, CanalEntry.EventType eventType) {
}
/**
* 处理笔记表事件
*
* @param columnMap 列数据
* @param eventType 事件类型
*/
private void handleNoteEvent(Map<String, Object> columnMap, CanalEntry.EventType eventType) {
// 获取笔记 ID
Long noteId = Long.parseLong(columnMap.get("id").toString());
// 不同的事件,处理逻辑不同
switch (eventType) {
case INSERT -> syncNoteIndex(noteId); // 记录新增事件
case UPDATE -> {
// 记录更新事件
// 笔记变更后的状态
Integer status = Integer.parseInt(columnMap.get("status").toString());
// 笔记可见范围
Integer visible = Integer.parseInt(columnMap.get("visible").toString());
if (Objects.equals(status, NoteStatusEnum.NORMAL.getCode())
&& Objects.equals(visible, NoteVisibleEnum.PUBLIC.getCode())) { // 正常展示,并且可见性为公开
// 对索引进行覆盖更新
syncNoteIndex(noteId);
} else if (Objects.equals(visible, NoteVisibleEnum.PRIVATE.getCode()) // 仅对自己可见
|| Objects.equals(status, NoteStatusEnum.DELETED.getCode())
|| Objects.equals(status, NoteStatusEnum.DOWNED.getCode())) { // 被逻辑删除、被下架
// 删除笔记文档
deleteNoteDocument(String.valueOf(noteId));
}
}
default -> log.warn("Unhandled event type for t_note: {}", eventType);
}
}
/**
* 删除笔记文档
*
* @param documentId 文档ID
*/
private void deleteNoteDocument(String documentId) {
try {
client.delete(deleteRequest -> deleteRequest
.index(NoteIndex.NAME)
.id(documentId));
} catch (IOException e) {
log.error("删除笔记文档异常", e);
}
}
/**
* 同步笔记索引
*
* @param noteId 笔记ID
*/
private void syncNoteIndex(Long noteId) {
// 从数据库查询 Elasticsearch 索引数据
List<Map<String, Object>> result = selectMapper.selectEsNoteIndexData(noteId);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : result) {
// 创建索引请求对象,指定索引名称
IndexRequest<Object> request = IndexRequest.of(indexRequest -> indexRequest
.index(NoteIndex.NAME)
// 设置文档的 ID使用记录中的主键 “id” 字段值
.id(String.valueOf(recordMap.get(NoteIndex.FIELD_NOTE_ID)))
// 设置文档的内容,使用查询结果的记录数据
.document(recordMap));
// 将数据写入 Elasticsearch 索引
try {
IndexResponse response = client.index(request);
} catch (IOException e) {
log.error("写入 Elasticsearch 索引异常", e);
}
}
}
/**
* 将列数据解析为 Map
*
* @param columns 列数据
* @return Map
*/
private Map<String, Object> parseColumns2Map(List<CanalEntry.Column> columns) {
Map<String, Object> map = Maps.newHashMap();
columns.forEach(column -> {
if (Objects.isNull(column)) return;
map.put(column.getName(), column.getValue());
});
return map;
}
}

View File

@@ -0,0 +1,19 @@
package com.hanserwei.hannote.search.domain.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
@Mapper
public interface SelectMapper {
/**
* 查询笔记文档所需的全字段数据
*
* @param noteId 笔记 ID
* @return 笔记文档所需全字段数据
*/
List<Map<String, Object>> selectEsNoteIndexData(@Param("noteId") long noteId);
}

View File

@@ -0,0 +1,18 @@
package com.hanserwei.hannote.search.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum NoteStatusEnum {
BE_EXAMINE(0), // 待审核
NORMAL(1), // 正常展示
DELETED(2), // 被删除
DOWNED(3), // 被下架
;
private final Integer code;
}

View File

@@ -0,0 +1,15 @@
package com.hanserwei.hannote.search.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum NoteVisibleEnum {
PUBLIC(0), // 公开,所有人可见
PRIVATE(1); // 仅自己可见
private final Integer code;
}

View File

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.hanserwei.hannote.search.domain.mapper.SelectMapper">
<select id="selectEsNoteIndexData" resultType="map" parameterType="map">
select n.id,
n.title,
n.topic_name as topic,
n.type,
n.img_uris,
DATE_FORMAT(n.create_time, '%Y-%m-%d %H:%i:%s') AS create_time,
DATE_FORMAT(n.update_time, '%Y-%m-%d %H:%i:%s') AS update_time,
u.nickname,
u.avatar,
IFNULL(nc.like_total, 0) as like_total,
IFNULL(nc.collect_total, 0) as collect_total,
IFNULL(nc.comment_total, 0) as comment_total
from t_note n
left join t_user u on n.creator_id = u.id
left join t_note_count nc on n.id = nc.note_id
where n.visible = 0
and n.`status` = 1
<if test="noteId != null">
and n.id = #{noteId}
limit 1
</if>
</select>
</mapper>