From 678c8ab8eb327b4dbd30495a60d9c5aca1596cc0 Mon Sep 17 00:00:00 2001
From: Hanserwei <2628273921@qq.com>
Date: Mon, 3 Nov 2025 14:22:55 +0800
Subject: [PATCH] =?UTF-8?q?feat(search):=20=E5=AE=9E=E7=8E=B0=20Canal=20?=
=?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5=E5=88=B0=20Elasticsearch?=
=?UTF-8?q?=20=E5=8A=9F=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 添加 Elasticsearch 客户端依赖及配置
- 实现 Canal 数据监听与解析逻辑
- 新增笔记索引同步与删除处理
- 添加 MyBatis Mapper 扫描与数据源配置
- 定义笔记状态与可见性枚举类
- 配置 MyBatis XML 映射文件路径
---
.idea/MyBatisCodeHelperDatasource.xml | 3 +-
han-note-search/pom.xml | 14 +-
.../search/HannoteSearchApplication.java | 2 +
.../hannote/search/canal/CanalSchedule.java | 193 ++++++++++++++----
.../search/domain/mapper/SelectMapper.java | 19 ++
.../hannote/search/enums/NoteStatusEnum.java | 18 ++
.../hannote/search/enums/NoteVisibleEnum.java | 15 ++
.../main/resources/mapperxml/SelectMapper.xml | 28 +++
8 files changed, 247 insertions(+), 45 deletions(-)
create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/domain/mapper/SelectMapper.java
create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/enums/NoteStatusEnum.java
create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/enums/NoteVisibleEnum.java
create mode 100644 han-note-search/src/main/resources/mapperxml/SelectMapper.xml
diff --git a/.idea/MyBatisCodeHelperDatasource.xml b/.idea/MyBatisCodeHelperDatasource.xml
index 8d1e3bb..ed5a7b9 100644
--- a/.idea/MyBatisCodeHelperDatasource.xml
+++ b/.idea/MyBatisCodeHelperDatasource.xml
@@ -20,11 +20,12 @@
-
+
diff --git a/han-note-search/pom.xml b/han-note-search/pom.xml
index 9bbb613..778d6c9 100644
--- a/han-note-search/pom.xml
+++ b/han-note-search/pom.xml
@@ -76,7 +76,19 @@
com.alibaba.otter
canal.protocol
-
+
+ com.baomidou
+ mybatis-plus-spring-boot3-starter
+
+
+ com.mysql
+ mysql-connector-j
+
+
+
+ com.alibaba
+ druid-spring-boot-3-starter
+
diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java
index 0a49982..fa547fe 100644
--- a/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java
+++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java
@@ -1,11 +1,13 @@
package com.hanserwei.hannote.search;
+import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
+@MapperScan("com.hanserwei.hannote.search.domain.mapper")
public class HannoteSearchApplication {
public static void main(String[] args) {
SpringApplication.run(HannoteSearchApplication.class, args);
diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java
index e6a2828..96e8a99 100644
--- a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java
+++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java
@@ -1,14 +1,25 @@
package com.hanserwei.hannote.search.canal;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.collect.Maps;
+import com.hanserwei.hannote.search.domain.mapper.SelectMapper;
+import com.hanserwei.hannote.search.enums.NoteStatusEnum;
+import com.hanserwei.hannote.search.enums.NoteVisibleEnum;
+import com.hanserwei.hannote.search.index.NoteIndex;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Component
@@ -19,17 +30,10 @@ public class CanalSchedule implements Runnable {
private CanalProperties canalProperties;
@Resource
private CanalConnector canalConnector;
-
- /**
- * 打印字段信息
- *
- * @param columns 字段信息
- */
- private static void printColumn(List columns) {
- for (CanalEntry.Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
+ @Resource
+ private SelectMapper selectMapper;
+ @Resource
+ private ElasticsearchClient client;
@Override
@Scheduled(fixedDelay = 100) // 每隔 100ms 被执行一次
@@ -50,11 +54,11 @@ public class CanalSchedule implements Runnable {
// 拉取数据为空,休眠 1s, 防止频繁拉取
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
- log.error("消费 Canal 批次数据异常", e);
+ log.error("休眠异常", e);
}
} else {
- // 如果当前批次有数据,打印这批次中的数据条目
- printEntry(message.getEntries());
+ // 如果当前批次有数据,处理这批次数据
+ processEntry(message.getEntries());
}
// 对当前批次的消息进行 ack 确认,表示该批次的数据已经被成功消费
@@ -67,46 +71,149 @@ public class CanalSchedule implements Runnable {
}
/**
- * 打印这一批次中的数据条目(和官方示例代码一致,后续小节中会自定义这块)
- *
+ * 处理这一批次数据
* @param entrys 批次数据
*/
- private void printEntry(List entrys) {
+ private void processEntry(List entrys) throws Exception {
+ // 循环处理批次数据
for (CanalEntry.Entry entry : entrys) {
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
- || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
- continue;
- }
+ // 只处理 ROWDATA 行数据类型的 Entry,忽略事务等其他类型
+ if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
+ // 获取事件类型(如:INSERT、UPDATE、DELETE 等等)
+ CanalEntry.EventType eventType = entry.getHeader().getEventType();
+ // 获取数据库名称
+ String database = entry.getHeader().getSchemaName();
+ // 获取表名称
+ String table = entry.getHeader().getTableName();
- CanalEntry.RowChange rowChage;
- try {
- rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry,
- e);
- }
+ // 解析出 RowChange 对象,包含 RowData 和事件相关信息
+ CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- CanalEntry.EventType eventType = rowChage.getEventType();
- System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType);
+ // 遍历所有行数据(RowData)
+ for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+ // 获取行中所有列的最新值(AfterColumns)
+ List columns = rowData.getAfterColumnsList();
- for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == CanalEntry.EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == CanalEntry.EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
+ // 将列数据解析为 Map,方便后续处理
+ Map columnMap = parseColumns2Map(columns);
+
+ // 自定义处理
+ log.info("EventType: {}, Database: {}, Table: {}, Columns: {}", eventType, database, table, columnMap);
+ // 处理事件
+ processEvent(columnMap, table, eventType);
}
}
}
+ }
+ private void processEvent(Map columnMap, String table, CanalEntry.EventType eventType) {
+ switch (table) {
+ case "t_note" -> handleNoteEvent(columnMap, eventType); // 笔记表
+ case "t_user" -> handleUserEvent(columnMap, eventType); // 用户表
+ default -> log.warn("Table: {} not support", table);
+ }
+ }
+
+ /**
+ * 处理用户表事件
+ *
+ * @param columnMap 列数据
+ * @param eventType 事件类型
+ */
+ private void handleUserEvent(Map columnMap, CanalEntry.EventType eventType) {
}
+ /**
+ * 处理笔记表事件
+ *
+ * @param columnMap 列数据
+ * @param eventType 事件类型
+ */
+ private void handleNoteEvent(Map columnMap, CanalEntry.EventType eventType) {
+ // 获取笔记 ID
+ Long noteId = Long.parseLong(columnMap.get("id").toString());
+
+ // 不同的事件,处理逻辑不同
+ switch (eventType) {
+ case INSERT -> syncNoteIndex(noteId); // 记录新增事件
+ case UPDATE -> {
+ // 记录更新事件
+ // 笔记变更后的状态
+ Integer status = Integer.parseInt(columnMap.get("status").toString());
+ // 笔记可见范围
+ Integer visible = Integer.parseInt(columnMap.get("visible").toString());
+
+ if (Objects.equals(status, NoteStatusEnum.NORMAL.getCode())
+ && Objects.equals(visible, NoteVisibleEnum.PUBLIC.getCode())) { // 正常展示,并且可见性为公开
+ // 对索引进行覆盖更新
+ syncNoteIndex(noteId);
+ } else if (Objects.equals(visible, NoteVisibleEnum.PRIVATE.getCode()) // 仅对自己可见
+ || Objects.equals(status, NoteStatusEnum.DELETED.getCode())
+ || Objects.equals(status, NoteStatusEnum.DOWNED.getCode())) { // 被逻辑删除、被下架
+ // 删除笔记文档
+ deleteNoteDocument(String.valueOf(noteId));
+ }
+ }
+ default -> log.warn("Unhandled event type for t_note: {}", eventType);
+ }
+ }
+
+ /**
+ * 删除笔记文档
+ *
+ * @param documentId 文档ID
+ */
+ private void deleteNoteDocument(String documentId) {
+ try {
+ client.delete(deleteRequest -> deleteRequest
+ .index(NoteIndex.NAME)
+ .id(documentId));
+ } catch (IOException e) {
+ log.error("删除笔记文档异常", e);
+ }
+ }
+
+ /**
+ * 同步笔记索引
+ *
+ * @param noteId 笔记ID
+ */
+ private void syncNoteIndex(Long noteId) {
+ // 从数据库查询 Elasticsearch 索引数据
+ List