feat(search): 集成 Canal 实现数据库变更监听与词典热更新

- 新增 Canal 客户端配置与连接管理
- 实现 Canal 数据订阅与消费调度任务
- 添加外部词典热更新接口与服务实现- 配置 Elasticsearch词典热更新支持
- 引入 Canal 相关依赖并统一版本管理- 启用 Spring 定时任务支持以驱动 Canal 消费- 增加项目词典以优化拼写检查准确性
This commit is contained in:
2025-11-02 19:02:52 +08:00
parent 7c62f1dcf9
commit 96b4127873
10 changed files with 345 additions and 0 deletions

View File

@@ -2,6 +2,7 @@
<dictionary name="project"> <dictionary name="project">
<words> <words>
<w>asyn</w> <w>asyn</w>
<w>entrys</w>
<w>hannote</w> <w>hannote</w>
<w>hanserwei</w> <w>hanserwei</w>
<w>jobhandler</w> <w>jobhandler</w>

View File

@@ -63,6 +63,20 @@
<groupId>org.elasticsearch.client</groupId> <groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId> <artifactId>elasticsearch-rest-client</artifactId>
</dependency> </dependency>
<!-- Canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -2,8 +2,10 @@ package com.hanserwei.hannote.search;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication @SpringBootApplication
@EnableScheduling
public class HannoteSearchApplication { public class HannoteSearchApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(HannoteSearchApplication.class, args); SpringApplication.run(HannoteSearchApplication.class, args);

View File

@@ -0,0 +1,64 @@
package com.hanserwei.hannote.search.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.Objects;
@Component
@Slf4j
public class CanalClient implements DisposableBean {
@Resource
private CanalProperties canalProperties;
private CanalConnector canalConnector;
/**
* 实例化 Canal 链接对象
*
* @return CanalConnector
*/
@Bean
public CanalConnector getCanalConnector() {
// Canal 链接地址
String address = canalProperties.getAddress();
String[] addressArr = address.split(":");
// IP 地址
String host = addressArr[0];
// 端口
int port = Integer.parseInt(addressArr[1]);
// 创建一个 CanalConnector 实例,连接到指定的 Canal 服务端
canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(host, port),
canalProperties.getDestination(),
canalProperties.getUsername(),
canalProperties.getPassword());
// 连接到 Canal 服务端
canalConnector.connect();
// 订阅 Canal 中的数据变化,指定要监听的数据库和表(可以使用表名、数据库名的通配符)
canalConnector.subscribe(canalProperties.getSubscribe());
// 回滚 Canal 消费者的位点,回滚到上次提交的消费位置
canalConnector.rollback();
return canalConnector;
}
/**
* 在 Spring 容器销毁时释放资源
*/
@Override
public void destroy() {
if (Objects.nonNull(canalConnector)) {
// 断开 canalConnector 与 Canal 服务的连接
canalConnector.disconnect();
}
}
}

View File

@@ -0,0 +1,43 @@
package com.hanserwei.hannote.search.canal;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = CanalProperties.PREFIX)
@Component
@Data
public class CanalProperties {
public static final String PREFIX = "canal";
/**
* Canal 链接地址
*/
private String address;
/**
* 数据目标
*/
private String destination;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 订阅规则
*/
private String subscribe;
/**
* 一批次拉取数据量,默认 1000 条
*/
private int batchSize = 1000;
}

View File

@@ -0,0 +1,111 @@
package com.hanserwei.hannote.search.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class CanalSchedule implements Runnable {
@Resource
private CanalProperties canalProperties;
@Resource
private CanalConnector canalConnector;
/**
* 打印字段信息
*
* @param columns 字段信息
*/
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
@Override
@Scheduled(fixedDelay = 100) // 每隔 100ms 被执行一次
public void run() {
// 初始化批次 ID-1 表示未开始或未获取到数据
long batchId = -1;
try {
// 从 canalConnector 获取批量消息,返回的数据量由 batchSize 控制,若不足,则拉取已有的
Message message = canalConnector.getWithoutAck(canalProperties.getBatchSize());
// 获取当前拉取消息的批次 ID
batchId = message.getId();
// 获取当前批次中的数据条数
long size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
// 拉取数据为空,休眠 1s, 防止频繁拉取
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
} else {
// 如果当前批次有数据,打印这批次中的数据条目
printEntry(message.getEntries());
}
// 对当前批次的消息进行 ack 确认,表示该批次的数据已经被成功消费
canalConnector.ack(batchId);
} catch (Exception e) {
log.error("消费 Canal 批次数据异常", e);
// 如果出现异常,需要进行数据回滚,以便重新消费这批次的数据
canalConnector.rollback(batchId);
}
}
/**
* 打印这一批次中的数据条目(和官方示例代码一致,后续小节中会自定义这块)
*
* @param entrys 批次数据
*/
private void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry,
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
}

View File

@@ -0,0 +1,26 @@
package com.hanserwei.hannote.search.controller;
import com.hanserwei.framework.biz.operationlog.aspect.ApiOperationLog;
import com.hanserwei.hannote.search.service.ExtDictService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/search")
@Slf4j
public class ExtDictController {
@Resource
private ExtDictService extDictService;
@GetMapping("/ext/dict")
@ApiOperationLog(description = "热更新词典")
public ResponseEntity<String> extDict() {
return extDictService.getHotUpdateExtDict();
}
}

View File

@@ -0,0 +1,13 @@
package com.hanserwei.hannote.search.service;
import org.springframework.http.ResponseEntity;
public interface ExtDictService {
/**
* 获取热更新词典
*
* @return 热更新词典
*/
ResponseEntity<String> getHotUpdateExtDict();
}

View File

@@ -0,0 +1,54 @@
package com.hanserwei.hannote.search.service.impl;
import com.hanserwei.hannote.search.service.ExtDictService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
@Service
@Slf4j
public class ExtDictServiceImpl implements ExtDictService {
/**
* 热更新词典路径
*/
@Value("${elasticsearch.hotUpdateExtDict}")
private String hotUpdateExtDict;
@Override
public ResponseEntity<String> getHotUpdateExtDict() {
try {
// 获取文件的最后修改时间
Path path = Paths.get(hotUpdateExtDict);
long lastModifiedTime = Files.getLastModifiedTime(path).toMillis();
// 生成 ETag使用文件内容的哈希值
String fileContent = Files.lines(path).collect(Collectors.joining("\n"));
String eTag = String.valueOf(fileContent.hashCode());
// 设置响应头
HttpHeaders headers = new HttpHeaders();
headers.set("ETag", eTag);
// 设置内容类型为 UTF-8
headers.setContentType(MediaType.valueOf("text/plain;charset=UTF-8"));
// 返回文件内容和 HTTP 头部
return ResponseEntity.ok()
.headers(headers)
.lastModified(lastModifiedTime) // 请求头中设置 Last-Modified
.body(fileContent);
} catch (Exception e) {
log.error("==> 获取热更新词典异常: ", e);
}
return null;
}
}

17
pom.xml
View File

@@ -70,6 +70,7 @@
<xxl-job.version>3.2.0</xxl-job.version> <xxl-job.version>3.2.0</xxl-job.version>
<elasticsearch-java.version>9.2.0</elasticsearch-java.version> <elasticsearch-java.version>9.2.0</elasticsearch-java.version>
<elasticsearch.version>9.2.0</elasticsearch.version> <elasticsearch.version>9.2.0</elasticsearch.version>
<canal.version>1.1.8</canal.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
@@ -314,6 +315,22 @@
<artifactId>elasticsearch-rest-client</artifactId> <artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version> <version>${elasticsearch.version}</version>
</dependency> </dependency>
<!-- Canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.common</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>