diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java index 96e8a99..eb35447 100644 --- a/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/canal/CanalSchedule.java @@ -1,16 +1,20 @@ package com.hanserwei.hannote.search.canal; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.IndexRequest; import co.elastic.clients.elasticsearch.core.IndexResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.common.collect.Maps; +import com.hanserwei.framework.common.enums.StatusEnum; import com.hanserwei.hannote.search.domain.mapper.SelectMapper; import com.hanserwei.hannote.search.enums.NoteStatusEnum; import com.hanserwei.hannote.search.enums.NoteVisibleEnum; import com.hanserwei.hannote.search.index.NoteIndex; +import com.hanserwei.hannote.search.index.UserIndex; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -121,7 +125,114 @@ public class CanalSchedule implements Runnable { * @param eventType 事件类型 */ private void handleUserEvent(Map columnMap, CanalEntry.EventType eventType) { + // 获取用户 ID + Long userId = Long.parseLong(columnMap.get("id").toString()); + // 不同的事件,处理逻辑不同 + switch (eventType) { + case INSERT -> syncUserIndex(userId); // 记录新增事件 + case UPDATE -> { // 记录更新事件 + // 用户变更后的状态 + Integer status = Integer.parseInt(columnMap.get("status").toString()); + // 逻辑删除 + Integer isDeleted = Integer.parseInt(columnMap.get("is_deleted").toString()); + + if (Objects.equals(status, StatusEnum.ENABLE.getValue()) + && Objects.equals(isDeleted, 0)) { // 用户状态为已启用,并且未被逻辑删除 + // 更新用户索引、笔记索引 + syncNotesIndexAndUserIndex(userId); + } else if (Objects.equals(status, StatusEnum.DISABLED.getValue()) // 用户状态为禁用 + || Objects.equals(isDeleted, 1)) { // 被逻辑删除 + // 删除用户文档 + deleteUserDocument(String.valueOf(userId)); + } + } + default -> log.warn("Unhandled event type for t_user: {}", eventType); + } + } + + /** + * 删除用户文档 + * + * @param documentId 文档 ID + */ + private void deleteUserDocument(String documentId) { + try { + client.delete(d -> d.index(UserIndex.NAME).id(documentId)); + } catch (IOException e) { + log.error("删除用户文档异常", e); + } + } + + /** + * 同步用户索引、笔记索引(可能是多条) + * + * @param userId 用户 ID + */ + private void syncNotesIndexAndUserIndex(Long userId) { + BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder(); + // 1. 用户索引 + List> userResult = selectMapper.selectEsUserIndexData(userId); + // 遍历查询结果,将每条记录同步到 Elasticsearch + for (Map recordMap : userResult) { + // 构建 BulkOperation + BulkOperation op = BulkOperation.of(b -> b + .index(i -> i + .index(UserIndex.NAME) + .id(String.valueOf(recordMap.get(UserIndex.FIELD_USER_ID))) + .document(recordMap) + ) + ); + // 通过 operations() 方法添加到 Builder + bulkRequestBuilder.operations(op); + } + List> noteResult = selectMapper.selectEsNoteIndexData(null, userId); + for (Map recordMap : noteResult) { + BulkOperation op = BulkOperation.of(b -> b + .index(i -> i + .index(NoteIndex.NAME) + .id(String.valueOf(recordMap.get(NoteIndex.FIELD_NOTE_ID))) + .document(recordMap) + ) + ); + bulkRequestBuilder.operations(op); + } + + // 构建 BulkRequest + BulkRequest bulkRequest = bulkRequestBuilder.build(); + try { + client.bulk(bulkRequest); + } catch (IOException e) { + log.error("执行批量请求出错:", e); + } + } + + /** + * 同步用户索引数据 + * + * @param userId 用户 ID + */ + private void syncUserIndex(Long userId) { + // 1. 同步用户索引 + List> userResult = selectMapper.selectEsUserIndexData(userId); + + // 遍历查询结果,将每条记录同步到 Elasticsearch + for (Map recordMap : userResult) { + // 创建索引请求对象,指定索引名称 + IndexRequest request = IndexRequest.of(indexRequest -> indexRequest + // 指定索引名称 + .index(UserIndex.NAME) + // 设置文档的 ID,使用记录中的主键 “id” 字段值 + .id(String.valueOf(recordMap.get(UserIndex.FIELD_USER_ID))) + // 设置文档的内容,使用查询结果的记录数据 + .document(recordMap)); + // 将数据写入 Elasticsearch 索引 + try { + client.index(request); + } catch (IOException e) { + log.error("写入索引异常", e); + } + } } /** @@ -181,7 +292,7 @@ public class CanalSchedule implements Runnable { */ private void syncNoteIndex(Long noteId) { // 从数据库查询 Elasticsearch 索引数据 - List> result = selectMapper.selectEsNoteIndexData(noteId); + List> result = selectMapper.selectEsNoteIndexData(noteId, null); // 遍历查询结果,将每条记录同步到 Elasticsearch for (Map recordMap : result) { // 创建索引请求对象,指定索引名称 diff --git a/han-note-search/src/main/java/com/hanserwei/hannote/search/domain/mapper/SelectMapper.java b/han-note-search/src/main/java/com/hanserwei/hannote/search/domain/mapper/SelectMapper.java index a9de640..c9428b7 100644 --- a/han-note-search/src/main/java/com/hanserwei/hannote/search/domain/mapper/SelectMapper.java +++ b/han-note-search/src/main/java/com/hanserwei/hannote/search/domain/mapper/SelectMapper.java @@ -9,11 +9,19 @@ import java.util.Map; @Mapper public interface SelectMapper { + /** * 查询笔记文档所需的全字段数据 - * * @param noteId 笔记 ID * @return 笔记文档所需全字段数据 */ - List> selectEsNoteIndexData(@Param("noteId") long noteId); + List> selectEsNoteIndexData(@Param("noteId") Long noteId, @Param("userId") Long userId); + + /** + * 查询用户索引所需的全字段数据 + * + * @param userId 用户 ID + * @return 用户索引所需全字段数据 + */ + List> selectEsUserIndexData(@Param("userId") Long userId); } \ No newline at end of file diff --git a/han-note-search/src/main/resources/mapperxml/SelectMapper.xml b/han-note-search/src/main/resources/mapperxml/SelectMapper.xml index d2f41fc..9b8c8c5 100644 --- a/han-note-search/src/main/resources/mapperxml/SelectMapper.xml +++ b/han-note-search/src/main/resources/mapperxml/SelectMapper.xml @@ -22,7 +22,28 @@ and n.`status` = 1 and n.id = #{noteId} + + + and u.id = #{userId} + + limit 1 + + \ No newline at end of file