feat(search): 实现用户索引同步与删除功能

- 新增处理用户事件的逻辑,包括插入和更新操作
- 实现用户文档的删除方法- 添加批量同步用户索引和笔记索引的功能
- 扩展 selectEsNoteIndexData 方法支持按用户 ID 查询
- 新增 selectEsUserIndexData 方法用于查询用户索引数据
- 更新 XML 映射文件以支持新的查询条件和字段
This commit is contained in:
2025-11-03 14:52:51 +08:00
parent 678c8ab8eb
commit 268a009c9b
3 changed files with 143 additions and 3 deletions

View File

@@ -1,16 +1,20 @@
package com.hanserwei.hannote.search.canal;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Maps;
import com.hanserwei.framework.common.enums.StatusEnum;
import com.hanserwei.hannote.search.domain.mapper.SelectMapper;
import com.hanserwei.hannote.search.enums.NoteStatusEnum;
import com.hanserwei.hannote.search.enums.NoteVisibleEnum;
import com.hanserwei.hannote.search.index.NoteIndex;
import com.hanserwei.hannote.search.index.UserIndex;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
@@ -121,7 +125,114 @@ public class CanalSchedule implements Runnable {
* @param eventType 事件类型
*/
private void handleUserEvent(Map<String, Object> columnMap, CanalEntry.EventType eventType) {
// 获取用户 ID
Long userId = Long.parseLong(columnMap.get("id").toString());
// 不同的事件,处理逻辑不同
switch (eventType) {
case INSERT -> syncUserIndex(userId); // 记录新增事件
case UPDATE -> { // 记录更新事件
// 用户变更后的状态
Integer status = Integer.parseInt(columnMap.get("status").toString());
// 逻辑删除
Integer isDeleted = Integer.parseInt(columnMap.get("is_deleted").toString());
if (Objects.equals(status, StatusEnum.ENABLE.getValue())
&& Objects.equals(isDeleted, 0)) { // 用户状态为已启用,并且未被逻辑删除
// 更新用户索引、笔记索引
syncNotesIndexAndUserIndex(userId);
} else if (Objects.equals(status, StatusEnum.DISABLED.getValue()) // 用户状态为禁用
|| Objects.equals(isDeleted, 1)) { // 被逻辑删除
// 删除用户文档
deleteUserDocument(String.valueOf(userId));
}
}
default -> log.warn("Unhandled event type for t_user: {}", eventType);
}
}
/**
* 删除用户文档
*
* @param documentId 文档 ID
*/
private void deleteUserDocument(String documentId) {
try {
client.delete(d -> d.index(UserIndex.NAME).id(documentId));
} catch (IOException e) {
log.error("删除用户文档异常", e);
}
}
/**
* 同步用户索引、笔记索引(可能是多条)
*
* @param userId 用户 ID
*/
private void syncNotesIndexAndUserIndex(Long userId) {
BulkRequest.Builder bulkRequestBuilder = new BulkRequest.Builder();
// 1. 用户索引
List<Map<String, Object>> userResult = selectMapper.selectEsUserIndexData(userId);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : userResult) {
// 构建 BulkOperation
BulkOperation op = BulkOperation.of(b -> b
.index(i -> i
.index(UserIndex.NAME)
.id(String.valueOf(recordMap.get(UserIndex.FIELD_USER_ID)))
.document(recordMap)
)
);
// 通过 operations() 方法添加到 Builder
bulkRequestBuilder.operations(op);
}
List<Map<String, Object>> noteResult = selectMapper.selectEsNoteIndexData(null, userId);
for (Map<String, Object> recordMap : noteResult) {
BulkOperation op = BulkOperation.of(b -> b
.index(i -> i
.index(NoteIndex.NAME)
.id(String.valueOf(recordMap.get(NoteIndex.FIELD_NOTE_ID)))
.document(recordMap)
)
);
bulkRequestBuilder.operations(op);
}
// 构建 BulkRequest
BulkRequest bulkRequest = bulkRequestBuilder.build();
try {
client.bulk(bulkRequest);
} catch (IOException e) {
log.error("执行批量请求出错:", e);
}
}
/**
* 同步用户索引数据
*
* @param userId 用户 ID
*/
private void syncUserIndex(Long userId) {
// 1. 同步用户索引
List<Map<String, Object>> userResult = selectMapper.selectEsUserIndexData(userId);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : userResult) {
// 创建索引请求对象,指定索引名称
IndexRequest<Object> request = IndexRequest.of(indexRequest -> indexRequest
// 指定索引名称
.index(UserIndex.NAME)
// 设置文档的 ID使用记录中的主键 “id” 字段值
.id(String.valueOf(recordMap.get(UserIndex.FIELD_USER_ID)))
// 设置文档的内容,使用查询结果的记录数据
.document(recordMap));
// 将数据写入 Elasticsearch 索引
try {
client.index(request);
} catch (IOException e) {
log.error("写入索引异常", e);
}
}
}
/**
@@ -181,7 +292,7 @@ public class CanalSchedule implements Runnable {
*/
private void syncNoteIndex(Long noteId) {
// 从数据库查询 Elasticsearch 索引数据
List<Map<String, Object>> result = selectMapper.selectEsNoteIndexData(noteId);
List<Map<String, Object>> result = selectMapper.selectEsNoteIndexData(noteId, null);
// 遍历查询结果,将每条记录同步到 Elasticsearch
for (Map<String, Object> recordMap : result) {
// 创建索引请求对象,指定索引名称

View File

@@ -9,11 +9,19 @@ import java.util.Map;
@Mapper
public interface SelectMapper {
/**
* 查询笔记文档所需的全字段数据
*
* @param noteId 笔记 ID
* @return 笔记文档所需全字段数据
*/
List<Map<String, Object>> selectEsNoteIndexData(@Param("noteId") long noteId);
List<Map<String, Object>> selectEsNoteIndexData(@Param("noteId") Long noteId, @Param("userId") Long userId);
/**
* 查询用户索引所需的全字段数据
*
* @param userId 用户 ID
* @return 用户索引所需全字段数据
*/
List<Map<String, Object>> selectEsUserIndexData(@Param("userId") Long userId);
}

View File

@@ -22,7 +22,28 @@
and n.`status` = 1
<if test="noteId != null">
and n.id = #{noteId}
</if>
<if test="userId != null">
and u.id = #{userId}
</if>
<if test="noteId != null">
limit 1
</if>
</select>
<select id="selectEsUserIndexData" resultType="map" parameterType="map">
select u.id,
u.nickname,
u.avatar,
u.han_note_id,
IFNULL(uc.note_total, 0) as note_total,
IFNULL(uc.fans_total, 0) as fans_total
from t_user u
left join t_user_count uc on u.id = uc.user_id
where u.`status` = 0
and u.is_deleted = 0
<if test="userId != null">
and u.id = #{userId}
</if>
</select>
</mapper>