From 96b412787380597796f53a2a193c3768c80a9530 Mon Sep 17 00:00:00 2001 From: Hanserwei <2628273921@qq.com> Date: Sun, 2 Nov 2025 19:02:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(search):=20=E9=9B=86=E6=88=90=20Canal=20?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E6=95=B0=E6=8D=AE=E5=BA=93=E5=8F=98=E6=9B=B4?= =?UTF-8?q?=E7=9B=91=E5=90=AC=E4=B8=8E=E8=AF=8D=E5=85=B8=E7=83=AD=E6=9B=B4?= =?UTF-8?q?=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 Canal 客户端配置与连接管理 - 实现 Canal 数据订阅与消费调度任务 - 添加外部词典热更新接口与服务实现- 配置 Elasticsearch词典热更新支持 - 引入 Canal 相关依赖并统一版本管理- 启用 Spring 定时任务支持以驱动 Canal 消费- 增加项目词典以优化拼写检查准确性 --- .idea/dictionaries/project.xml | 1 + han-note-search/pom.xml | 14 +++ .../search/HannoteSearchApplication.java | 2 + .../hannote/search/canal/CanalClient.java | 64 ++++++++++ .../hannote/search/canal/CanalProperties.java | 43 +++++++ .../hannote/search/canal/CanalSchedule.java | 111 ++++++++++++++++++ .../search/controller/ExtDictController.java | 26 ++++ .../search/service/ExtDictService.java | 13 ++ .../service/impl/ExtDictServiceImpl.java | 54 +++++++++ pom.xml | 17 +++ 10 files changed, 345 insertions(+) create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalClient.java create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalProperties.java create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/controller/ExtDictController.java create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/service/ExtDictService.java create mode 100644 han-note-search/src/main/java/com/hanserwei/hannote/search/service/impl/ExtDictServiceImpl.java diff --git a/.idea/dictionaries/project.xml b/.idea/dictionaries/project.xml index 2f75dd8..70f0f46 100644 --- a/.idea/dictionaries/project.xml +++ b/.idea/dictionaries/project.xml @@ -2,6 +2,7 @@ asyn + entrys hannote hanserwei jobhandler diff --git a/han-note-search/pom.xml b/han-note-search/pom.xml index b3eb16b..9bbb613 100644 --- a/han-note-search/pom.xml +++ b/han-note-search/pom.xml @@ -63,6 +63,20 @@ org.elasticsearch.client elasticsearch-rest-client + + + com.alibaba.otter + canal.client + + + com.alibaba.otter + canal.common + + + com.alibaba.otter + canal.protocol + + diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java index e67042a..0a49982 100644 --- a/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/HannoteSearchApplication.java @@ -2,8 +2,10 @@ package com.hanserwei.hannote.search; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication +@EnableScheduling public class HannoteSearchApplication { public static void main(String[] args) { SpringApplication.run(HannoteSearchApplication.class, args); diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalClient.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalClient.java new file mode 100644 index 0000000..360aa85 --- /dev/null +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalClient.java @@ -0,0 +1,64 @@ +package com.hanserwei.hannote.search.canal; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.net.InetSocketAddress; +import java.util.Objects; + +@Component +@Slf4j +public class CanalClient implements DisposableBean { + + @Resource + private CanalProperties canalProperties; + + private CanalConnector canalConnector; + + /** + * 实例化 Canal 链接对象 + * + * @return CanalConnector + */ + @Bean + public CanalConnector getCanalConnector() { + // Canal 链接地址 + String address = canalProperties.getAddress(); + String[] addressArr = address.split(":"); + // IP 地址 + String host = addressArr[0]; + // 端口 + int port = Integer.parseInt(addressArr[1]); + + // 创建一个 CanalConnector 实例,连接到指定的 Canal 服务端 + canalConnector = CanalConnectors.newSingleConnector( + new InetSocketAddress(host, port), + canalProperties.getDestination(), + canalProperties.getUsername(), + canalProperties.getPassword()); + + // 连接到 Canal 服务端 + canalConnector.connect(); + // 订阅 Canal 中的数据变化,指定要监听的数据库和表(可以使用表名、数据库名的通配符) + canalConnector.subscribe(canalProperties.getSubscribe()); + // 回滚 Canal 消费者的位点,回滚到上次提交的消费位置 + canalConnector.rollback(); + return canalConnector; + } + + /** + * 在 Spring 容器销毁时释放资源 + */ + @Override + public void destroy() { + if (Objects.nonNull(canalConnector)) { + // 断开 canalConnector 与 Canal 服务的连接 + canalConnector.disconnect(); + } + } +} \ No newline at end of file diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalProperties.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalProperties.java new file mode 100644 index 0000000..4d2b0bf --- /dev/null +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalProperties.java @@ -0,0 +1,43 @@ +package com.hanserwei.hannote.search.canal; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@ConfigurationProperties(prefix = CanalProperties.PREFIX) +@Component +@Data +public class CanalProperties { + + public static final String PREFIX = "canal"; + + /** + * Canal 链接地址 + */ + private String address; + + /** + * 数据目标 + */ + private String destination; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 订阅规则 + */ + private String subscribe; + + /** + * 一批次拉取数据量,默认 1000 条 + */ + private int batchSize = 1000; +} \ No newline at end of file diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java new file mode 100644 index 0000000..088f6ae --- /dev/null +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java @@ -0,0 +1,111 @@ +package com.hanserwei.hannote.search.canal; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class CanalSchedule implements Runnable { + + @Resource + private CanalProperties canalProperties; + @Resource + private CanalConnector canalConnector; + + /** + * 打印字段信息 + * + * @param columns 字段信息 + */ + private static void printColumn(List columns) { + for (CanalEntry.Column column : columns) { + System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); + } + } + + @Override + @Scheduled(fixedDelay = 100) // 每隔 100ms 被执行一次 + public void run() { + // 初始化批次 ID,-1 表示未开始或未获取到数据 + long batchId = -1; + try { + // 从 canalConnector 获取批量消息,返回的数据量由 batchSize 控制,若不足,则拉取已有的 + Message message = canalConnector.getWithoutAck(canalProperties.getBatchSize()); + + // 获取当前拉取消息的批次 ID + batchId = message.getId(); + + // 获取当前批次中的数据条数 + long size = message.getEntries().size(); + if (batchId == -1 || size == 0) { + try { + // 拉取数据为空,休眠 1s, 防止频繁拉取 + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + } else { + // 如果当前批次有数据,打印这批次中的数据条目 + printEntry(message.getEntries()); + } + + // 对当前批次的消息进行 ack 确认,表示该批次的数据已经被成功消费 + canalConnector.ack(batchId); + } catch (Exception e) { + log.error("消费 Canal 批次数据异常", e); + // 如果出现异常,需要进行数据回滚,以便重新消费这批次的数据 + canalConnector.rollback(batchId); + } + } + + /** + * 打印这一批次中的数据条目(和官方示例代码一致,后续小节中会自定义这块) + * + * @param entrys 批次数据 + */ + private void printEntry(List entrys) { + for (CanalEntry.Entry entry : entrys) { + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN + || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + continue; + } + + CanalEntry.RowChange rowChage = null; + try { + rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, + e); + } + + CanalEntry.EventType eventType = rowChage.getEventType(); + System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n", + entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), + entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), + eventType); + + for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { + if (eventType == CanalEntry.EventType.DELETE) { + printColumn(rowData.getBeforeColumnsList()); + } else if (eventType == CanalEntry.EventType.INSERT) { + printColumn(rowData.getAfterColumnsList()); + } else { + System.out.println("-------> before"); + printColumn(rowData.getBeforeColumnsList()); + System.out.println("-------> after"); + printColumn(rowData.getAfterColumnsList()); + } + } + } + + + } + +} \ No newline at end of file diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/controller/ExtDictController.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/controller/ExtDictController.java new file mode 100644 index 0000000..9dfdc1e --- /dev/null +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/controller/ExtDictController.java @@ -0,0 +1,26 @@ +package com.hanserwei.hannote.search.controller; + +import com.hanserwei.framework.biz.operationlog.aspect.ApiOperationLog; +import com.hanserwei.hannote.search.service.ExtDictService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/search") +@Slf4j +public class ExtDictController { + + @Resource + private ExtDictService extDictService; + + @GetMapping("/ext/dict") + @ApiOperationLog(description = "热更新词典") + public ResponseEntity extDict() { + return extDictService.getHotUpdateExtDict(); + } + +} \ No newline at end of file diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/service/ExtDictService.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/service/ExtDictService.java new file mode 100644 index 0000000..64255a8 --- /dev/null +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/service/ExtDictService.java @@ -0,0 +1,13 @@ +package com.hanserwei.hannote.search.service; + +import org.springframework.http.ResponseEntity; + +public interface ExtDictService { + + /** + * 获取热更新词典 + * + * @return 热更新词典 + */ + ResponseEntity getHotUpdateExtDict(); +} \ No newline at end of file diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/service/impl/ExtDictServiceImpl.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/service/impl/ExtDictServiceImpl.java new file mode 100644 index 0000000..041210b --- /dev/null +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/service/impl/ExtDictServiceImpl.java @@ -0,0 +1,54 @@ +package com.hanserwei.hannote.search.service.impl; + +import com.hanserwei.hannote.search.service.ExtDictService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class ExtDictServiceImpl implements ExtDictService { + + /** + * 热更新词典路径 + */ + @Value("${elasticsearch.hotUpdateExtDict}") + private String hotUpdateExtDict; + + @Override + public ResponseEntity getHotUpdateExtDict() { + try { + // 获取文件的最后修改时间 + Path path = Paths.get(hotUpdateExtDict); + long lastModifiedTime = Files.getLastModifiedTime(path).toMillis(); + + // 生成 ETag(使用文件内容的哈希值) + String fileContent = Files.lines(path).collect(Collectors.joining("\n")); + String eTag = String.valueOf(fileContent.hashCode()); + + // 设置响应头 + HttpHeaders headers = new HttpHeaders(); + headers.set("ETag", eTag); + + // 设置内容类型为 UTF-8 + headers.setContentType(MediaType.valueOf("text/plain;charset=UTF-8")); + + // 返回文件内容和 HTTP 头部 + return ResponseEntity.ok() + .headers(headers) + .lastModified(lastModifiedTime) // 请求头中设置 Last-Modified + .body(fileContent); + } catch (Exception e) { + log.error("==> 获取热更新词典异常: ", e); + } + return null; + } +} diff --git a/pom.xml b/pom.xml index 05432ad..8e2ba9b 100755 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ 3.2.0 9.2.0 9.2.0 + 1.1.8 @@ -314,6 +315,22 @@ elasticsearch-rest-client ${elasticsearch.version} + + + com.alibaba.otter + canal.client + ${canal.version} + + + com.alibaba.otter + canal.common + ${canal.version} + + + com.alibaba.otter + canal.protocol + ${canal.version} +