网页数据检索与问答流程整合
在之前的工作中,我们已经按照 java-maxkb 的标准对网页数据进行了预处理,生成了符合向量化检索要求的段落数据。接下来,可以直接基于这些预处理数据进行检索,并将检索结果整合到问答流程中,从而实现对用户问题的快速响应。
本篇文档主要介绍两部分内容:
- 网页数据检索:通过调用预处理好的段落数据服务进行向量检索,获取与用户问题相似度较高的段落数据。
- 问答流程整合:将检索结果整合到问答系统中,经过问题重写、历史对话记录、结果过滤与排序后生成最终的提示词,并通过 WebSocket 或 SSE 返回给前端。
背景介绍
由于之前已经按照 java-maxkb 的标准对网页数据进行了预处理,现在每个网页已经拆分为多个段落,并生成了相应的向量数据。基于这些数据,我们可以直接进行向量检索。检索时会根据设定的相似度、返回的 top N 结果以及用户问题进行匹配,最终得到最相关的段落内容。接着,将检索结果整合到问答流程中,为后续回答提供依据。
单元测试:段落检索服务
下面的单元测试代码展示了如何调用 MaxKbParagraphRetrieveService 进行向量检索,并输出检索结果(即检索到的段落列表)。
该测试类使用 TioBootTest 加载系统配置,通过 AOP 获取段落检索服务实例,并调用 search 方法查询指定数据集下的相关段落。
package com.litongjava.college.hawaii.kapiolani;
import org.junit.Test;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.max.search.config.AdminAppConfig;
import com.litongjava.maxkb.service.kb.MaxKbParagraphRetrieveService;
import com.litongjava.maxkb.vo.MaxKbRetrieveResult;
import com.litongjava.tio.boot.testing.TioBootTest;
import com.litongjava.tio.utils.json.JsonUtils;
public class MaxKbParagraphRetriveveServiceTest {
@Test
public void test() {
TioBootTest.runWith(AdminAppConfig.class);
Long[] datasetIdArray = { 1L };
Float similarity = 0.2f;
Integer top_n = 20;
String question = "When is the first day of Fall 2025";
// max_kb_sentence_id
MaxKbRetrieveResult search = Aop.get(MaxKbParagraphRetrieveService.class).search(datasetIdArray, similarity, top_n, question);
System.out.println(JsonUtils.toJson(search.getParagraph_list()));
}
}
说明
- 此测试类用于验证段落检索服务是否能够根据用户问题(例如“Fall 2025 的第一天是什么时候”)返回相关的段落数据。
- 检索时设置了相似度阈值(0.2f)和返回 top 20 个结果。
网页数据检索服务
在问答系统中,除了直接使用检索服务获得段落外,还需要对检索结果进行后续整合。本部分代码展示了如何将检索结果转换为 WebPageContent 对象,并合并重复段落的内容,形成完整的答案引用。
数据去重与批量查询
下面的 MaxWebsiteRetrieveService 类负责对检索得到的段落结果进行处理。主要步骤包括:
- 遍历检索结果,利用 Map 以 paragraph_id 进行去重;
- 一次性查询所有需要的段落内容;
- 根据 paragraph_id 将完整内容填充到 WebPageContent 对象中,合并重复的内容。
package com.litongjava.max.search.services;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import com.litongjava.db.activerecord.Db;
import com.litongjava.db.activerecord.Row;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.maxkb.service.kb.MaxKbParagraphRetrieveService;
import com.litongjava.maxkb.vo.MaxKbRetrieveResult;
import com.litongjava.maxkb.vo.ParagraphSearchResultVo;
import com.litongjava.model.web.WebPageContent;
public class MaxWebsiteRetrieveService {
public List<WebPageContent> search(String question) {
Long[] datasetIdArray = { 1L };
Float similarity = 0.2f;
Integer top_n = 10;
MaxKbRetrieveResult step = Aop.get(MaxKbParagraphRetrieveService.class).search(datasetIdArray, similarity, top_n, question);
List<ParagraphSearchResultVo> searchResults = step.getParagraph_list();
// 使用 Map 以 paragraph_id 去重(key 为 paragraph_id,value 为对应的 WebPageContent)
Map<Long, WebPageContent> mergedMap = new HashMap<>();
// 1. 收集所有需要查询的 paragraph_id(去重)
Set<Long> paragraphIds = new HashSet<>();
for (ParagraphSearchResultVo vo : searchResults) {
paragraphIds.add(vo.getParagraph_id());
}
// 2. 一次查询获取所有 paragraph 内容
Map<Long, String> paragraphContentMap = fetchParagraphContents(paragraphIds);
// 3. 遍历所有检索结果,根据 paragraph_id 从 map 中获取完整段落内容
for (ParagraphSearchResultVo vo : searchResults) {
long paragraphId = vo.getParagraph_id();
if (!mergedMap.containsKey(paragraphId)) {
WebPageContent pageContent = new WebPageContent();
pageContent.setTitle(vo.getDocument_name());
String document_url = vo.getDocument_url();
if (document_url != null && !document_url.startsWith("http")) {
pageContent.setUrl("https://" + document_url);
} else {
pageContent.setUrl(document_url);
}
pageContent.setDescription(vo.getDataset_name());
// 从批量查询的结果中获取内容
String fullContent = paragraphContentMap.get(paragraphId);
pageContent.setContent(fullContent);
mergedMap.put(paragraphId, pageContent);
} else {
// 如有重复,根据业务逻辑决定是否合并内容
WebPageContent pageContent = mergedMap.get(paragraphId);
String currentContent = pageContent.getContent();
String newContent = paragraphContentMap.get(paragraphId);
if (!currentContent.contains(newContent)) {
pageContent.setContent(currentContent + "\n" + newContent);
}
}
}
return new ArrayList<>(mergedMap.values());
}
/**
* 根据一组 paragraphId 一次性查询所有段落内容
*/
private Map<Long, String> fetchParagraphContents(Set<Long> paragraphIds) {
// 构建 IN 查询的占位符
StringBuilder sql = new StringBuilder("SELECT id, content FROM max_kb_paragraph WHERE id IN (");
StringJoiner joiner = new StringJoiner(",");
for (int i = 0; i < paragraphIds.size(); i++) {
joiner.add("?");
}
sql.append(joiner.toString()).append(")");
// 将 paragraphIds 转换为参数数组
Object[] params = paragraphIds.toArray();
// 假设 Db.find 返回的是一个包含 Record 的 List,每个 Record 有 id 和 content 属性
List<Row> records = Db.find(sql.toString(), params);
Map<Long, String> contentMap = new HashMap<>();
for (Row record : records) {
Long id = record.getLong("id");
String content = record.getStr("content");
contentMap.put(id, content);
}
return contentMap;
}
}
说明:
- 首先调用段落检索服务,获取与问题相关的段落列表。
- 利用 Set 收集所有需要查询的 paragraph_id,然后一次性查询数据库,提升性能。
- 遍历检索结果,将每个段落的内容填充到 WebPageContent 对象中,同时对重复段落进行合并处理。
合并检索结果
在上述代码中,通过 Map 来去重并合并相同 paragraph_id 的结果,确保同一网页内容不会重复显示。最后返回一个 WebPageContent 列表供后续使用。
问答流程整合
在问答系统中,需要将网页检索结果与用户对话进行整合,实现“增强式问答”。下面的 MaxChatService 类展示了如何将检索服务整合到问答流程中。代码中包括了对用户消息的处理、历史消息查询、问题重写、检索结果反馈以及根据不同 FocusMode 的处理逻辑等多个环节。
package com.litongjava.max.search.services;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import com.google.common.util.concurrent.Striped;
import com.jfinal.kit.Kv;
import com.litongjava.db.activerecord.Db;
import com.litongjava.gemini.GoogleGeminiModels;
import com.litongjava.google.search.GoogleCustomSearchResponse;
import com.litongjava.google.search.SearchResultItem;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.max.search.callback.SearchGeminiSseCallback;
import com.litongjava.max.search.callback.SearchPerplexiticySeeCallback;
import com.litongjava.max.search.can.ChatWsStreamCallCan;
import com.litongjava.max.search.consts.FocusMode;
import com.litongjava.max.search.consts.SearchTableNames;
import com.litongjava.max.search.model.MaxSearchChatMessage;
import com.litongjava.max.search.model.MaxSearchChatSession;
import com.litongjava.max.search.vo.ChatParamVo;
import com.litongjava.max.search.vo.ChatReqMessage;
import com.litongjava.max.search.vo.ChatWsReqMessageVo;
import com.litongjava.max.search.vo.ChatDeltaRespVo;
import com.litongjava.max.search.vo.CitationsVo;
import com.litongjava.max.search.vo.WebPageSource;
import com.litongjava.model.web.WebPageContent;
import com.litongjava.openai.chat.ChatMessage;
import com.litongjava.openai.chat.OpenAiChatMessage;
import com.litongjava.openai.chat.OpenAiChatRequestVo;
import com.litongjava.openai.client.OpenAiClient;
import com.litongjava.openai.consts.PerplexityConstants;
import com.litongjava.openai.consts.PerplexityModels;
import com.litongjava.template.PromptEngine;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.sse.SsePacket;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.json.FastJson2Utils;
import com.litongjava.tio.utils.json.JsonUtils;
import com.litongjava.tio.utils.snowflake.SnowflakeIdUtils;
import com.litongjava.tio.utils.tag.TagUtils;
import com.litongjava.tio.utils.thread.TioThreadUtils;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
@Slf4j
public class MaxChatService {
private static final Striped<Lock> sessionLocks = Striped.lock(1024);
private GeminiPredictService geminiPredictService = Aop.get(GeminiPredictService.class);
private MaxSearchService maxSearchService = Aop.get(MaxSearchService.class);
private MaxSearchSummaryQuestionService summaryQuestionService = Aop.get(MaxSearchSummaryQuestionService.class);
private ChatMessgeService chatMessgeService = Aop.get(ChatMessgeService.class);
private WebpageSourceService webpageSourceService = Aop.get(WebpageSourceService.class);
public boolean spped = true;
/**
* 使用搜索模型处理消息
*/
public void dispatch(ChannelContext channelContext, ChatWsReqMessageVo reqMessageVo) {
ChatReqMessage message = reqMessageVo.getMessage();
Long userId = reqMessageVo.getUserId();
Long sessionId = message.getChatId();
Long messageQuestionId = message.getMessageId();
String content = message.getContent();
ChatParamVo chatParamVo = new ChatParamVo();
// create chat or save message
String focusMode = reqMessageVo.getFocusMode();
if (!Db.exists(SearchTableNames.max_search_chat_session, "id", sessionId)) {
Lock lock = sessionLocks.get(sessionId);
lock.lock();
try {
TioThreadUtils.execute(() -> {
String summary = summaryQuestionService.summary(content);
new MaxSearchChatSession().setId(sessionId).setUserId(userId).setTitle(summary).setFocusMode(focusMode).save();
});
} finally {
lock.unlock();
}
}
// query history
List<ChatMessage> history = chatMessgeService.getHistoryById(sessionId);
chatParamVo.setHistory(history);
if (content.length() > 30 || history.size() > 0) {
if (!content.startsWith("Summary: http")) {
String rewrited = Aop.get(RewriteQuestionService.class).rewrite(content, history);
log.info("rewrite to:{}", rewrited);
chatParamVo.setRewrited(rewrited);
if (channelContext != null) {
Kv end = Kv.by("type", "rewrited").set("content", rewrited);
byte[] jsonBytes = FastJson2Utils.toJSONBytes(end);
if (reqMessageVo.isSse()) {
Tio.bSend(channelContext, new SsePacket(jsonBytes));
} else {
Tio.bSend(channelContext, WebSocketResponse.fromBytes(jsonBytes));
}
}
}
}
// save user mesasge
new MaxSearchChatMessage().setId(messageQuestionId).setChatId(sessionId)
//
.setRole("user").setContent(content).save();
String from = channelContext.getString("FROM");
chatParamVo.setFrom(from);
Boolean copilotEnabled = reqMessageVo.getCopilotEnabled();
Call call = null;
long answerMessageId = SnowflakeIdUtils.id();
chatParamVo.setAnswerMessageId(answerMessageId);
log.info("focusMode:{},{}", userId, focusMode);
if (FocusMode.webSearch.equals(focusMode)) {
call = maxSearchService.search(channelContext, reqMessageVo, chatParamVo);
} else if (FocusMode.rag.equals(focusMode)) {
MaxWebSiteAugmentedService maxRetrieveService = Aop.get(MaxWebSiteAugmentedService.class);
call = maxRetrieveService.index(channelContext, reqMessageVo, chatParamVo);
} else if (FocusMode.translator.equals(focusMode)) {
String inputPrompt = Aop.get(TranslatorPromptService.class).genInputPrompt(channelContext, content, copilotEnabled, messageQuestionId, messageQuestionId, from);
chatParamVo.setSystemPrompt(inputPrompt);
call = geminiPredictService.predict(channelContext, reqMessageVo, chatParamVo);
} else if (FocusMode.deepSeek.equals(focusMode)) {
Aop.get(DeepSeekPredictService.class).predict(channelContext, reqMessageVo, chatParamVo);
} else if (FocusMode.mathAssistant.equals(focusMode)) {
String inputPrompt = PromptEngine.renderToString("math_assistant_prompt.txt");
chatParamVo.setSystemPrompt(inputPrompt);
Aop.get(DeepSeekPredictService.class).predict(channelContext, reqMessageVo, chatParamVo);
} else if (FocusMode.writingAssistant.equals(focusMode)) {
String inputPrompt = PromptEngine.renderToString("writing_assistant_prompt.txt");
chatParamVo.setSystemPrompt(inputPrompt);
Aop.get(DeepSeekPredictService.class).predict(channelContext, reqMessageVo, chatParamVo);
} else {
// 5. 向前端通知一个空消息,标识搜索结束,开始推理
//{"type":"message","data":"", "messageId": "32fcbbf251337c"}
ChatDeltaRespVo<String> chatVo = ChatDeltaRespVo.message(answerMessageId, "");
byte[] jsonBytes = FastJson2Utils.toJSONBytes(chatVo);
if (channelContext != null) {
if (reqMessageVo.isSse()) {
Tio.bSend(channelContext, new SsePacket(jsonBytes));
} else {
Tio.bSend(channelContext, new WebSocketResponse(jsonBytes));
}
}
chatVo = ChatDeltaRespVo.message(answerMessageId, "Sorry Developing");
jsonBytes = FastJson2Utils.toJSONBytes(chatVo);
if (channelContext != null) {
if (reqMessageVo.isSse()) {
Tio.bSend(channelContext, new SsePacket(jsonBytes));
} else {
Tio.bSend(channelContext, new WebSocketResponse(jsonBytes));
}
Kv end = Kv.by("type", "messageEnd").set("messageId", answerMessageId);
jsonBytes = FastJson2Utils.toJSONBytes(end);
if (reqMessageVo.isSse()) {
Tio.bSend(channelContext, new SsePacket(jsonBytes));
} else {
Tio.bSend(channelContext, new WebSocketResponse(jsonBytes));
}
}
}
if (call != null) {
ChatWsStreamCallCan.put(sessionId.toString(), call);
}
}
}
说明:
- 消息处理与历史查询
根据用户发送的消息和会话 ID 查询历史对话记录,并在必要时生成新会话。 - 问题重写
若消息内容较长或存在历史对话,则调用 RewriteQuestionService 对问题进行重写,提升搜索准确性,并将重写后的内容反馈给前端。 - 检索与反馈
根据 FocusMode 的不同(例如 webSearch、rag、translator 等),调用不同的检索或预测服务,并将搜索结果以 JSON 格式保存到数据库,同时通过 WebSocket 或 SSE 将部分结果返回给前端。 - 后续推理
若无匹配模式,则直接通知前端搜索结束,等待后续推理处理。
时间
2025-04-06 20:22:57.943 [t-io-1] INFO c.l.m.s.k.MaxKbParagraphRetrieveService.search0:41 - search_paragraph:498681673099689984,[1],0.2,10,10
2025-04-06 20:22:57.957 [t-io-1] INFO c.l.m.s.s.MaxRagService.index:62 - retrived 学校有那些部门 elapsed:2810(ms)
2025-04-06 20:23:12.313 [OkHttp https://generativelanguage.googleapis.com/...] INFO c.l.m.s.c.SearchGeminiSseCallback.onResponse:113 - finish llm in 20000 (ms),tokens:{"promptTokenCount":12924,"candidatesTokenCount":1468,"totalTokenCount":14392}
体验地址
https://www.kapiolani.ai/
总结
本文档详细描述了基于已预处理好的网页数据进行向量检索以及如何将检索结果整合到问答流程中的实现方式。首先通过单元测试验证了段落检索服务,接着利用 MaxWebsiteRetrieveService 对检索结果进行去重、批量查询与内容合并,最终在 MaxChatService 中将检索结果、问题重写、历史消息及多种 FocusMode 的处理逻辑有机整合,形成一个完整的增强式问答系统。整个流程不仅保证了检索的高效性,也为后续大模型的回答提供了丰富的上下文信息,从而提升了系统整体的问答效果。
以上即为网页数据检索与问答流程整合的完整文档,所有代码均已完整保留,并按照良好的组织结构重新排序和说明。