课件说明 链接到标题
核心内容:
✅ 理解结构化数据RAG与非结构化数据的区别
✅ 小数据量高效方案 - PandasQueryEngine
✅ 向量检索+BM25基准方案及其局限性
✅ Text-to-SQL基础与高级优化技术
✅ 混合检索策略
✅ 智能查询路由
✅ 生产部署优化方案
预期效果:
数值查询:支持高精度计算与聚合,结果精准
语义查询:具备自然语言理解能力,召回准确
平均响应时间:1-2秒
注意:
- 本节课属于RAG进阶篇,如果没有基础的同学可以先去看RAG入门篇。
第一章:核心概念与痛点分析 链接到标题
一、 结构化数据 vs 非结构化数据 链接到标题
在企业中,数据往往以结构化形式存在于关系型数据库、业务系统、报表等,而 RAG 技术最初设计用来处理非结构化文本检索(如文件、文档、知识库)。当需要把结构化数据引入 RAG 管道时,传统的文本向量检索方式会遭遇一系列挑战,这使得企业在部署基于结构化数据的 RAG 系统时变得复杂且不稳定。以下就是这些主要难点及其背景分析。
| 维度 | 非结构化数据RAG | 结构化数据RAG |
|---|---|---|
| 数据形式 | 文档、PDF、网页 | 数据库表、CSV、Excel |
| 查询示例 | “解释量子计算” | “过去30天销售额?” |
| 检索方式 | 向量相似度 | SQL查询 + 向量检索 |
| 核心挑战 | 语义理解 | 精确计算、聚合 |
| 主要技术 | Embedding + 向量库 | Text-to-SQL + 混合检索 |
- 核心痛点
向量检索无法精确聚合 - “过去30天销售额"无法通过向量计算
Text-to-SQL缺乏语义理解 - “推荐适合的产品"SQL无法理解
单一方案覆盖不全 - 同时需要精确计算和语义理解
性能与成本平衡难 - 准确率、速度、成本的三角矛盾
1. 结构化数据与语义向量空间不匹配 链接到标题
大多数 RAG 实现依赖向量检索技术(embedding + 向量数据库)对文本进行相似度搜索。这对非结构化文本有效,但对表格/结构化数据效果差强人意:
表格数据的行列结构、数值字段、层次关系等语义在向量空间中往往被“扁平化”,导致 embedding 丢失结构信息。
在把表格拆为“自然语言段落”再向量化时,会丢失列与行之间的本质关系,使检索结果难以准确匹配真实结构语义。
向量模型并不擅长捕捉关系型数据的精确筛选条件,如数值范围、日期范围和条件逻辑(比如 SQL WHERE 子句,“过去30天销售额”)。
这类问题直接影响检索质量与后续生成的准确性,是结构化数据 RAG 最大的难点之一。
2. 查询语义对结构化数据的不匹配 链接到标题
用户往往以自然语言提问(例如 “过去三年中欧盟 vs 美国赢得的法律案件数对比?”),而结构化数据一般需要通过 SQL 或逻辑分析表达:
RAG 传统检索不具备关系型查询能力,难以准确把语言查询转换为结构化查询(如 SQL)。
即使结合 LLM 做自然语言到 SQL 的转换,也存在生成错误、表名/字段拼写错误或语义误解等问题。
多表 join、聚合函数等复杂操作,对 LLM 推理要求更高且容易出现错误。
这会导致检索结果不准确或可执行查询失败。
3. 数据规模与实时性压力 链接到标题
企业结构化数据量通常很大,并且更新频繁:
实时或近实时更新数据仓库/湖难以及时同步到向量数据库或传统检索库。
向量数据库在大规模数据下需要高性能索引(如 HNSW / IVF),但这些不一定对结构化条件过滤友好。
查询性能和延迟是实时交互场景中的重要指标,但大数据规模会压制检索速度,造成用户体验不佳。
这就要求复杂的数据管道和高性能基础设施。
4.复杂数据预处理与特征提取成本高 链接到标题
为让结构化数据适配 RAG,需要很多额外处理:
提取 schema 信息(表/列元数据)并构造描述性文本或 embeddings。
用额外 dual-encoder、cross-encoder 进行表格 reranking。
为数值字段构建 binning 或专门 embedding。
模式混合的表格、分类编码、缺失值处理等。
这些预处理步骤往往需要 ML 专业知识与工程投入。
下表总结结构化数据在 RAG 过程中的关键难点:
| 难点 | 本质问题 | 对系统影响 |
|---|---|---|
| 向量化语义丢失 | 向量 embedding 无法表示复杂表格关系 | 检索结果不准确 |
| 自然语言查询转换困难 | 语言与关系模型语义不匹配 | 生成错误 SQL 或无效查询 |
| 大规模实时更新 | 数据同步与索引维护压力大 | 延迟高、结果不及时 |
| 结构丢失与逻辑混乱 | 扁平文本丧失字段间依赖关系 | 回答错误、高幻觉 |
| 数据预处理与工程复杂 | 需要大量特征工程与 schema 建模 | 增加开发成本 |
在企业RAG落地中,结构化数据之所以成为“硬骨头”,是因为它试图用概率性的模型(LLM/向量)去解决确定性的逻辑问题(SQL/计算)。
核心结论与建议:
不要试图全部向量化:对于强结构化数据,Text-to-SQL 或 Text-to-Pandas 仍是首选,但必须配合人工精修的Schema描述(给LLM看的数据库字典)。
采用“路由架构”(Router):在RAG前端增加意图识别。如果是定性问题(“某产品的评价如何”),走向量检索;如果是定量问题(“某产品库存多少”),走SQL查询。
权限前置:不要依赖向量库做权限,而是在检索前就通过元数据过滤掉用户无权访问的数据范围。
结构化数据 RAG 的关键不在“模型能力”,而在“系统设计能力”。
LLM ≠ 数据库
RAG ≠ 万能查询引擎
企业级系统必须:
控制数据边界
显式建模语义
解耦推理与执行
第二章:环境准备 链接到标题
# 查看 Python 版本
!python --version
# 查看 Llama-Index 版本
!pip list | grep llama-index
# 安装依赖
!pip install -q llama-index-core llama-index-llms-openai llama-index-embeddings-openai llama-index-llms-dashscope llama-index-embeddings-dashscope
!pip install -q llama-index-vector-stores-milvus llama-index-retrievers-bm25 rank-bm25 llama-parse
!pip install -q sqlalchemy pandas python-dotenv
print("✅ 依赖安装完成")
# 全局配置
import os
import warnings
from dotenv import load_dotenv
from llama_index.llms.dashscope import DashScope
from llama_index.embeddings.dashscope import DashScopeEmbedding
from llama_index.core.settings import Settings
warnings.filterwarnings('ignore')
# 加载环境变量
load_dotenv(override=True)
# 设置 LLM
Settings.llm = DashScope(
model_name="qwen-max",
api_key=os.getenv("DASHSCOPE_API_KEY"),
api_base=os.getenv("DASHSCOPE_BASE_URL", "https://api.dashscope.aliyuncs.com")
)
# 设置 Embedding模型
Settings.embed_model = DashScopeEmbedding(
model_name="text-embedding-v4",
api_key=os.getenv("DASHSCOPE_API_KEY"),
api_base=os.getenv("DASHSCOPE_BASE_URL", "https://api.dashscope.aliyuncs.com")
)
print("✅ LLM配置完成")
print(f" 模型: {Settings.llm.model_name}")
print(f" Embedding: {Settings.embed_model.model_name}")
# 基本使用
response = Settings.llm.complete("你好,请介绍一下通义千问。")
print(response)
response_embed = Settings.embed_model.get_query_embedding("你好,请介绍一下通义千问。")
print(response_embed)
1. Markdown文档中的表格检索方式 链接到标题
1.数据解析 (Parsing Phase) 目标:把“混沌”的 Markdown 文档拆解成此“条理分明”的结构化节点。
输入:原始 Markdown 文件(包含文本、标题、表格)。
工具:MarkdownElementNodeParser。
动作:
扫描:扫描全文,识别 |—|—| 格式的表格。
提取表格 (Object Extraction):
将识别到的表格从原文中“抠”出来,转换成 TableNode(本质是 DataFrame 结构,或者保留原始 Markdown 表格文本)。
这部分内容不直接切片进入向量库,因为它包含了复杂的结构,直接切片会破坏语义。
生成摘要 (Summary Generation):
关键一步:Parser 会调用 LLM(如 GPT-3.5),给每个提取出来的表格生成一段自然语言摘要(Summary)。
例子:LLM 生成“这是一个关于公司各部门2023年Q3财务数据的表格,包含研发、销售部的收入和成本。”
构建基座节点 (Base Nodes):
将剩余的普通正文切片成 TextNode。
创建一个 IndexNode,内容是上面的表格摘要,并带有一个指针(index_id)指向那个完整的 TableNode。
import os
import pandas as pd
from llama_index.core import Document, VectorStoreIndex
from llama_index.core.node_parser import MarkdownElementNodeParser
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.llms.openai import OpenAI
from llama_index.core.schema import TextNode, IndexNode # 确保导入这两个类
# 0. 准备测试数据 (确保表格格式标准)
markdown_content = """
# 公司季度财报
以下是我们在2023年第三季度的详细财务表现:
| 部门 | 收入(万元) | 成本(万元) | 利润(万元) |
|------|------------|------------|------------|
| 研发部 | 0 | 500 | -500 |
| 销售部 | 2000 | 800 | 1200 |
| 产品部 | 500 | 200 | 300 |
总结来看,销售部表现最为亮眼。
"""
doc = Document(text=markdown_content)
# 1. 创建解析器
node_parser = MarkdownElementNodeParser(
llm=Settings.llm,
num_workers=1 # 并行解析数
)
# 2. 正确的解析流程
# 第一步: 从文档获取原始节点
raw_nodes = node_parser.get_nodes_from_documents([doc])
# 第二步: 从节点中提取基座节点和表格对象
base_nodes, objects = node_parser.get_nodes_and_objects(raw_nodes)
print(f"解析完成: 生成了 {len(base_nodes)} 个基座节点, {len(objects)} 个表格对象")
# 3. 验证表格对象
print(f"\nobjects 类型: {type(objects)}")
if objects:
print("\n✅ 表格对象提取成功!")
for i, obj in enumerate(objects):
print(f" 对象 {i}:")
print(f" 类型: {type(obj).__name__}")
print(f" ID: {obj.node_id if hasattr(obj, 'node_id') else 'N/A'}")
# 如果是表格节点,尝试显示内容
if hasattr(obj, 'text'):
print(f" 内容预览: {obj.text[:100]}...")
if hasattr(obj, 'metadata'):
print(f" 元数据: {obj.metadata}")
else:
print("\n⚠️ 未提取到表格,调试信息:")
for i, node in enumerate(raw_nodes):
print(f" 节点 {i}: {type(node).__name__} - {node.text[:60]}...")
- 检索与生成 (Retrieval & Generation Phase) 目标:用户提问,系统精准找到表格并回答。
工具:RecursiveRetriever (递归检索器)。
流程:
第一级检索 (Level-1 Retrieval):
用户问:“销售部门去年的利润怎么样?”
检索器在 Vector Store 中搜索。
由于表格摘要里有“…销售部的收入和成本…”,语义匹配度很高,检索器找回了这个 IndexNode(表格摘要)。
递归解析 (Recursive Step):
RecursiveRetriever 拿到 IndexNode 后,发现它手里捏着一个 index_id。
它根据这个 ID 去 DocStore (objects) 里抓取完整的表格对象。
现在,检索器手里拿到了完整的 Markdown 表格或 DataFrame。
上下文组装 (Synthesis):
如果检索到了其他普通 TextNode(正文),也一并带上。
最终的 Prompt 变成了:
[上下文] 1. (正文片段) 公司Q3战略重点是... 2. (表格内容) | 部门 | 收入 | ... | 销售部 | 2000 | ... [问题] 销售部门去年的利润怎么样?
LLM 生成:
- LLM 看到完整的表格数据,轻松计算或提取出“1200万元”,生成最终回答。
# 4. 过滤 base_nodes (只保留 TextNode 和 IndexNode)
base_nodes_filtered = [n for n in raw_nodes if isinstance(n, (TextNode, IndexNode))]
print(f"\n过滤后基座节点数: {len(base_nodes_filtered)}")
# 5. 建立向量索引 (只对摘要建索引)
print("\n开始建立向量索引...")
vector_index = VectorStoreIndex(nodes=base_nodes_filtered)
vector_retriever = vector_index.as_retriever(similarity_top_k=2)
# 6. 构建递归检索器
# 将 objects 列表转换为字典 {node_id: node}
objects_dict = {obj.node_id: obj for obj in objects}
recursive_retriever = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever},
node_dict=objects_dict, # 传入表格对象字典
verbose=True # 开启日志,可以看到递归过程
)
print("\n✅ 递归检索器构建完成!")
# 7. 执行检索测试
print("\n" + "="*50)
print("开始检索测试")
print("="*50)
query = "销售部的利润是多少?"
print(f"\n查询: {query}")
retrieved_nodes = recursive_retriever.retrieve(query)
print(f"\n检索到 {len(retrieved_nodes)} 个结果:")
for i, node_with_score in enumerate(retrieved_nodes):
print(f"\n--- 结果 {i+1} (相关度: {node_with_score.score:.3f}) ---")
print(f"节点类型: {type(node_with_score.node).__name__}")
print(f"内容:\n{node_with_score.node.text[:300]}...")
# 8. 结合 LLM 生成最终答案
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core import Settings
query_engine = RetrieverQueryEngine.from_args(
recursive_retriever,
llm=Settings.llm
)
print("\n" + "="*50)
print("生成最终答案")
print("="*50)
response = query_engine.query(query)
print(f"\n问题: {query}")
print(f"\n答案: {response}")
print(f"\n来源节点数: {len(response.source_nodes)}")
2. PDF文档中表格解析检索 链接到标题
核心架构:LlamaParse + MarkdownElementNodeParser
实现思路
PDF → Markdown 转换:使用 LlamaParse(LlamaIndex 官方的 PDF 解析工具)将 PDF 转换为结构化的 Markdown 格式,表格会被精准保留为 Markdown 表格
Markdown → 节点拆分:使用 MarkdownElementNodeParser 提取表格并生成摘要
递归检索:与之前 Markdown 案例完全一致
重要说明
LlamaParse API Key 需要在 https://cloud.llamaindex.ai 注册账号 免费版有每日解析页数限制(通常1000页/天) 将 API Key 替换代码中的 “llx-…”
成本优化 LlamaParse 是付费服务,如果文档较大建议先测试几页 可以通过 pages=[1, 2, 3] 参数只解析指定页
备选方案 (如果不想用 LlamaParse) 使用开源工具如 pdfplumber 或 camelot 提取表格,然后手工转换为 Markdown 但这种方式对复杂表格(跨行跨列)的处理效果通常不如 LlamaParse

import os
from llama_index.core import Document, VectorStoreIndex, Settings
from llama_index.core.node_parser import MarkdownElementNodeParser
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.schema import TextNode, IndexNode
from llama_parse import LlamaParse
from dotenv import load_dotenv
load_dotenv(override=True)
# ========== 步骤 0: 环境配置 ==========
# 1. 安装依赖: pip install llama-parse llama-index
# 2. 获取 API Key: https://cloud.llamaindex.ai (需要注册 LlamaCloud 账号)
# 3. 设置环境变量LLAMA_CLOUD_API_KEY
# os.environ["LLAMA_CLOUD_API_KEY"] = "llx-..."
# ========== 步骤 1: 使用 LlamaParse 解析 PDF ==========
print("📄 开始解析 PDF...")
parser = LlamaParse(
result_type="markdown", # 输出格式为 Markdown (保留表格结构)
verbose=True,
language="ch_sim", # 简体中文
# 可选: 提供自定义的解析指令
# parsing_instruction="重点提取财务数据表格,保留表头和数值"
)
# 解析 PDF 文件
pdf_path = "dataset/招股意向书.pdf"
# 添加错误处理
try:
documents = parser.load_data(pdf_path)
print(f"✅ PDF 解析完成,共生成 {len(documents)} 个文档片段")
# 检查是否为空
if not documents:
print("⚠️ 警告: 解析结果为空!")
print("可能的原因:")
print("1. PDF 文件路径不正确")
print("2. LlamaParse API 调用失败")
print("3. API Key 未正确配置")
print(f"\n请检查:")
print(f"- 文件是否存在: {os.path.exists(pdf_path)}")
print(f"- API Key 是否已设置: {bool(os.getenv('LLAMA_CLOUD_API_KEY'))}")
else:
print(f"预览第一个片段:\n{documents[0].text[:300]}...\n")
except Exception as e:
print(f"❌ PDF 解析失败: {e}")
print("\n调试信息:")
print(f"- 文件路径: {pdf_path}")
print(f"- 文件存在: {os.path.exists(pdf_path)}")
print(f"- API Key 已设置: {bool(os.getenv('LLAMA_CLOUD_API_KEY'))}")
raise
# ========== 步骤 2: 使用 MarkdownElementNodeParser 提取表格 ==========
print("🔍 开始提取表格...")
node_parser = MarkdownElementNodeParser(
llm=Settings.llm, # 用于生成表格摘要
num_workers=4 # 并行处理
)
# 从文档获取原始节点
raw_nodes = node_parser.get_nodes_from_documents(documents)
# 提取基座节点和表格对象
base_nodes, objects = node_parser.get_nodes_and_objects(raw_nodes)
print(f"✅ 提取完成:")
print(f" - 基座节点(文本+摘要): {len(base_nodes)} 个")
print(f" - 表格对象(完整表格): {len(objects)} 个\n")
# 调试:查看提取到的表格
if objects:
print("📊 表格对象示例:")
for i, obj in enumerate(objects[:3]): # 只显示前3个
print(f" 表格 {i+1}:")
print(f" ID: {obj.node_id}")
print(f" 内容预览: {obj.text[:100]}...\n")
# ========== 步骤 3: 构建索引和递归检索器 ==========
print("🔧 构建检索系统...")
# 过滤节点(只保留 TextNode 和 IndexNode)
base_nodes_filtered = [n for n in base_nodes if isinstance(n, (TextNode, IndexNode))]
# 建立向量索引
vector_index = VectorStoreIndex(nodes=base_nodes_filtered)
vector_retriever = vector_index.as_retriever(similarity_top_k=5)
# 构建 objects 字典
objects_dict = {obj.node_id: obj for obj in objects}
# 构建递归检索器
recursive_retriever = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever},
node_dict=objects_dict,
verbose=True
)
# 构建查询引擎
query_engine = RetrieverQueryEngine.from_args(
recursive_retriever,
llm=Settings.llm
)
print("✅ 检索系统构建完成!\n")
# ========== 步骤 4: 测试查询 ==========
print("="*60)
print("开始测试查询")
print("="*60)
# 示例查询(根据招股意向书的实际内容调整)
test_queries = [
"体外诊断项目中,“输血项目”的检测范围和用途是什么?",
]
for query in test_queries:
print(f"\n问题: {query}")
response = query_engine.query(query)
print(f"答案: {response}\n")
print("-" * 60)
第三章:PandasQueryEngine - 小数据量高效方案 链接到标题
在处理小于1万行的结构化数据时,PandasQueryEngine 是最直接高效的解决方案:
- ✅ 零索引成本: 无需向量化,直接操作内存中的 DataFrame
- ✅ 精准计算: LLM 生成 Pandas 代码,计算100%准确
- ✅ 快速迭代: 适合探索性数据分析
- ⚠️ 适用场景: 数据量 < 1万行,单表或少量表
3.1 入门:基础查询 链接到标题
环境准备与数据加载 链接到标题
import pandas as pd
import time
from typing import Dict, Any, List
from llama_index.experimental.query_engine import PandasQueryEngine
from llama_index.llms.openai import OpenAI
from llama_index.core import Settings
import logging
# 禁用 Phoenix/OpenInference 的错误日志,在后续有提到的监控工具
logging.getLogger("openinference.instrumentation.llama_index").setLevel(logging.CRITICAL)
# 加载产品数据
df = pd.read_csv("dataset/products.csv")
print(f"📊 数据加载成功: {len(df)} 行 x {len(df.columns)} 列")
print(f"\n列名: {list(df.columns)}")
print("\n数据预览:")
df.head()
创建 PandasQueryEngine 链接到标题
PandasQueryEngine 的核心参数:
| 参数 | 说明 |
|---|---|
df | 要查询的 DataFrame |
verbose | 是否显示生成的 Pandas 代码 |
instruction_str | 指导 LLM 如何生成代码 |
synthesize_response | 是否用 LLM 综合最终响应 |
# 优化的指令字符串 - 关键配置
instruction_str = (
"1. 将查询转换为可执行的 Pandas 代码\n"
"2. 代码必须是单行表达式,可以被 eval() 执行\n"
"3. DataFrame 变量名为 'df'\n"
"4. 只输出表达式本身,不要有其他文字\n"
"5. 不要使用引号包裹表达式\n"
"\n示例:\n"
"问题: 找出价格最高的产品\n"
"代码: df.loc[df['price'].idxmax()]['name']\n"
)
# 创建查询引擎
query_engine = PandasQueryEngine(
df=df, # DataFrame读取的数据
verbose=True, # 显示生成的代码
instruction_str=instruction_str,
synthesize_response=True # 综合回答
)
print("✅ PandasQueryEngine 初始化完成")
# 场景 1: 简单聚合查询
query = "电子产品类别有多少个产品?"
print(f"📝 查询: {query}")
response = query_engine.query(query)
print(f"💬 回答: {response}")
# 场景 2: 排序和筛选
query = "列出价格(price)最高的5个产品,显示名称和价格"
print(f"\n📝 查询: {query}")
response = query_engine.query(query)
print(f"💬 回答: {response}")
# 展示Pandas查询结果
df[['name', 'price']].sort_values(by='price', ascending=False).head(5)
3.2 进阶:复杂查询与性能监控 链接到标题
在生产环境中需要:
- 🔄 错误重试: 处理 LLM 偶发错误
- ⏱️ 性能监控: 跟踪查询耗时
- 📊 统计分析: 了解查询成功率
class AdvancedPandasQueryEngine:
"""增强的 Pandas 查询引擎 - 带错误处理和性能监控"""
def __init__(self, df: pd.DataFrame, llm_model: str = "gpt-4o-mini"):
self.df = df
self.llm = OpenAI(model=llm_model, temperature=0)
# 优化的指令(中文提示效果更好)
instruction_str = (
"你是数据分析专家,将自然语言转换为 Pandas 代码。\n"
"要求:\n"
"1. 生成单行可 eval() 执行的表达式\n"
"2. DataFrame 变量名为 'df'\n"
"3. 只输出代码,无其他文字\n"
"4. 优先使用向量化操作\n"
)
self.query_engine = PandasQueryEngine(
df=df, verbose=True,
instruction_str=instruction_str,
synthesize_response=True
)
self.query_history = []
def query(self, query_str: str, max_retries: int = 2) -> Dict[str, Any]:
"""执行查询,带重试和性能记录"""
start_time = time.time()
for attempt in range(max_retries + 1):
try:
logger.info(f"执行查询 (尝试 {attempt + 1}): {query_str}")
response = self.query_engine.query(query_str)
execution_time = time.time() - start_time
self.query_history.append({
"query": query_str,
"time": execution_time,
"success": True
})
return {"success": True, "result": str(response), "time": execution_time}
except Exception as e:
if attempt == max_retries:
self.query_history.append({
"query": query_str,
"time": time.time() - start_time,
"success": False,
"error": str(e)
})
return {"success": False, "error": str(e)}
def get_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
if not self.query_history:
return {"message": "没有查询历史"}
successful = [q for q in self.query_history if q["success"]]
return {
"total": len(self.query_history),
"success": len(successful),
"success_rate": f"{len(successful)/len(self.query_history)*100:.1f}%",
"avg_time": f"{sum(q['time'] for q in successful)/len(successful):.2f}s" if successful else "N/A"
}
# 初始化高级引擎
advanced_engine = AdvancedPandasQueryEngine(df)
print("✅ AdvancedPandasQueryEngine 初始化完成")
# 进阶查询示例
advanced_queries = [
"计算每个品牌(brand)的产品数量,按数量降序排列",
"找出评分(rating)高于4.5的产品名称和价格",
"显示各类别(category)的平均价格"
]
print("=" * 60)
print("📊 进阶查询测试")
print("=" * 60)
for query in advanced_queries:
print(f"\n📝 查询: {query}")
result = advanced_engine.query(query)
if result["success"]:
print(f"💬 回答: {result['result']}")
print(f"⏱️ 耗时: {result['time']:.2f}s")
else:
print(f"❌ 错误: {result['error']}")
# 显示统计
print("\n" + "=" * 60)
print("📈 性能统计:")
stats = advanced_engine.get_stats()
for k, v in stats.items():
print(f" {k}: {v}")
3.3 高阶:Chain of Tables(表格思维链) 链接到标题
普通的 Text-to-SQL(或 Text-to-Pandas)往往试图用一句超级复杂的 SQL/代码解决所有问题,这很容易出错(LLM 手滑写错代码)。对于复杂的多步骤分析,使用表格思维链技术:
原始表格 → [筛选] → 中间表1 → [聚合] → 中间表2 → [比较] → 最终结果
核心思想:
- 分解复杂问题 为简单步骤
- 逐步执行 每个操作
- 保留中间结果 便于调试
- 记录操作历史 可追溯
核心实现:
这个实现是一个人机协同的 Chain-of-Table:人(或上层 Agent):负责规划思维链(High-level reasoning),决定先 filter再 aggregate。
Python 类:负责维护中间状态(State Management),传递 current_df。
LLM:负责执行具体的原子操作(Atomic Execution),即把自然语言转成 Pandas 代码。
class ChainOfTablesEngine:
"""表格思维链引擎 - 多步骤推理(修正版)"""
def __init__(self, df: pd.DataFrame):
self.original_df = df.copy() # 保存原始数据
self.current_df = df.copy() # 当前工作表
self.operation_history = [] # 操作历史
self.llm = Settings.llm # LLM
def filter(self, condition: str) -> pd.DataFrame:
"""筛选操作 - 真正更新 DataFrame"""
print(f"[筛选] {condition}")
rows_before = len(self.current_df)
# 构造prompt,让LLM生成筛选代码
prompt = f"""
你是一位 Pandas 专家。用户想要筛选数据。
当前 DataFrame 的列名是: {list(self.current_df.columns)}
用户的筛选条件是: {condition}
请生成一行 Python 代码来完成筛选,要求:
1. 变量名必须是 'df'
2. 返回筛选后的 DataFrame
3. 只输出代码,不要有任何解释
示例: df[df['price'] > 5000]
"""
# 调用LLM生成代码
response = self.llm.complete(prompt)
generated_code = response.text.strip()
# 清理代码(去掉可能的markdown标记)
if generated_code.startswith("```"):
generated_code = generated_code.split("\n")[1]
if generated_code.endswith("```"):
generated_code = generated_code.rsplit("\n", 1)[0]
generated_code = generated_code.strip()
print(f"生成的代码: {generated_code}")
# 执行代码并更新 current_df
try:
df = self.current_df # 为了代码能找到 df 变量
filtered_df = eval(generated_code) # 执行代码
self.current_df = filtered_df # 更新当前工作表
print(f"✅ 筛选成功: {rows_before} 行 → {len(self.current_df)} 行")
except Exception as e:
print(f"❌ 代码执行失败: {e}")
print(f"保持原数据不变")
# 记录操作
self.operation_history.append({
'step': len(self.operation_history) + 1,
'operation': 'FILTER',
'description': condition,
'code': generated_code,
'rows': f"{rows_before} → {len(self.current_df)}"
})
return self.current_df
def aggregate(self, aggregation: str) -> str:
"""聚合操作 - 基于当前表计算"""
print(f"[聚合] {aggregation}")
# 构造prompt
prompt = f"""
你是一位 Pandas 专家。
当前 DataFrame 有 {len(self.current_df)} 行数据,列名是: {list(self.current_df.columns)}
用户想要: {aggregation}
请生成一行 Python 代码来完成计算,要求:
1. 变量名必须是 'df'
2. 返回计算结果(数值或Series)
3. 只输出代码,不要解释
示例: df['rating'].mean()
"""
response = self.llm.complete(prompt) # 调用LLM生成代码
generated_code = response.text.strip() # 去除前后的空格
# 清理代码
if generated_code.startswith("```"):
generated_code = generated_code.split("\n")[1]
if generated_code.endswith("```"):
generated_code = generated_code.rsplit("\n", 1)[0]
generated_code = generated_code.strip()
print(f"生成的代码: {generated_code}")
# 执行计算
try:
df = self.current_df
result = eval(generated_code)
print(f"✅ 计算结果: {result}")
except Exception as e:
result = f"计算失败: {e}"
print(f"❌ {result}")
# 记录操作
self.operation_history.append({
'step': len(self.operation_history) + 1,
'operation': 'AGGREGATE',
'description': aggregation,
'code': generated_code,
'result': str(result)[:100]
})
return str(result)
def reset(self):
"""重置到原始表"""
print("🔄 [重置] 恢复原始表格")
self.current_df = self.original_df.copy()
self.operation_history = []
def show_history(self):
"""显示操作历史"""
print("\n📜 操作历史:")
print("-" * 60)
for op in self.operation_history:
print(f" 步骤 {op['step']}: {op['operation']}")
print(f" 描述: {op['description']}")
print(f" 代码: {op.get('code', 'N/A')}")
if 'rows' in op:
print(f" 行数变化: {op['rows']}")
if 'result' in op:
print(f" 结果: {op['result']}")
print()
# 重新初始化
cot_engine = ChainOfTablesEngine(df)
print("✅ ChainOfTablesEngine 初始化完成(修正版)")
print("=" * 60)
print("🔗 Chain of Tables 实战示例")
print("=" * 60)
# 场景: 分析高端产品
print("\n📌 任务: 找出高价产品的平均评分")
print("思维链: 筛选(价格>5000) → 聚合(平均评分)")
# 步骤1: 筛选
print("\n" + "-" * 40)
cot_engine.filter("价格(price)大于5000")
# 步骤2: 聚合
print("\n" + "-" * 40)
result = cot_engine.aggregate("计算评分(rating)的平均值")
print(f"\n🎯 最终结果: {result}")
# 显示完整历史
cot_engine.show_history()
# 重置并执行另一个分析
print("\n" + "=" * 60)
print("📌 任务: 各类别产品数量排名")
print("思维链: 分组 → 计数 → 排序")
print("=" * 60)
cot_engine.reset()
result = cot_engine.aggregate("按类别(category)分组,计算每个类别的产品数量,按降序排列")
print(f"\n🎯 结果: {result}")
cot_engine.show_history()
# 数据验证
df1 = df[df['price'] > 5000]
df1
df1['rating'].mean()
# 将按类别(category)分组,计算每个类别的产品数量,按降序排列,第二个任务的结果获取出
query_result = df.groupby('category').size().sort_values(ascending=False)
query_result
#可以通过matplotlib来对生成的Pandas数据框进行可视化
import matplotlib.pyplot as plt
import platform
# 1. 解决中文显示问题
system_name = platform.system()
if system_name == 'Darwin': # macOS
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
elif system_name == 'Windows': # Windows
plt.rcParams['font.sans-serif'] = ['SimHei']
else: # Linux
plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei']
plt.rcParams['axes.unicode_minus'] = False
# 2. 定义标题
title = '各类别产品数量分布'
# 3. 绘图
plt.figure(figsize=(10, 6))
ax = query_result.plot(kind='bar', color='steelblue')
plt.title(title)
plt.xlabel('类别')
plt.ylabel('数量')
plt.xticks(rotation=45, ha='right')
# 4. 添加数值标签
for p in ax.patches:
ax.annotate(str(p.get_height()), (p.get_x() + p.get_width() / 2., p.get_height()),
ha='center', va='center', xytext=(0, 5), textcoords='offset points')
plt.tight_layout()
plt.show()
3.4 PandasQueryEngine 总结 链接到标题
适用场景对比 链接到标题
| 数据规模 | 推荐方案 | 原因 |
|---|---|---|
| < 1万行 | PandasQueryEngine | 零延迟、精准计算 |
| 1-10万行 | Text-to-SQL | 数据库优化、索引支持 |
| > 10万行 | Hybrid Retrieval | 向量检索+过滤 |
优化技巧 链接到标题
- 指令工程: 提供清晰的
instruction_str指导 LLM - 错误处理: 实现重试机制应对 LLM 偶发错误
- 性能监控: 记录查询耗时和成功率
- 复杂问题: 使用 Chain of Tables 分解为简单步骤
局限性 链接到标题
- ⚠️ 大数据量时内存开销大
- ⚠️ 复杂查询可能生成错误代码
- ⚠️ 不支持跨表 JOIN(需预处理)
第四章 向量检索+BM25基准 链接到标题
4.1 方案原理 链接到标题
在处理结构化数据(如数据库表、Excel、CSV)的 RAG 场景中,我们将每一行数据视为一个独立的文档进行索引。然而,结构化数据具有其独特性:它既包含需要语义理解的非结构化字段(如“商品描述”、“用户评论”),也包含需要精确匹配的结构化字段(如“SKU编码”、“具体参数”、“年份”)。面对这种混合特征,向量检索 + BM25 的组合拳是目前业界在语义检索方面的最佳实践方案。
一、 核心原理:行数据的双路召回 链接到标题
该方案首先将结构化表格的每一行数据进行序列化(Serialization),转化为包含字段名和字段值的文本段落(例如将 {‘Name’: ‘iPhone 15’, ‘Price’: 7999} 转换为 “Name: iPhone 15, Price: 7999”)。基于转换后的文本,系统同时维护两套索引:
向量检索(Vector Retrieval):负责“懂意”。 它将行文本映射到高维语义空间。当用户询问“适合移动办公的轻薄本”时,即使表格中没有“轻薄”二字,只写了“重量1.1kg”,向量模型也能捕捉到两者间的语义关联,从而召回相关记录。它弥补了传统数据库无法进行模糊语义查询的短板。
BM25 检索(Keyword Retrieval):负责“精准”。 它基于概率统计模型(TF-IDF 的改进版),对稀有且明确的关键词极其敏感。在结构化数据中,往往存在大量的专有名词(如“RTX4090”、“ISO9001”)。向量模型容易忽略这些低频词的精确差异,而 BM25 能确保这些核心参数必须出现在召回结果中,充当了“模糊 SQL”的硬过滤器。

方案 1:表格行级向量化 链接到标题
# 示例:向量检索实现
from llama_index.core import VectorStoreIndex, Document
import pandas as pd
from sqlalchemy import create_engine
# 数据库连接
db_url = "sqlite:///dataset/ecommerce.db"
engine = create_engine(db_url)
# 加载数据
df = pd.read_sql("SELECT * FROM products LIMIT 15", engine)
print(df.head(5))
# 将每行转为文本
documents = []
for _, row in df.iterrows():
text = f"""
产品名称: {row.get('name', row.get('product_name', 'Unknown'))}
类别: {row.get('category', 'N/A')}
价格: {row.get('price', row.get('unit_price', 0))}元
"""
documents.append(Document(text=text, metadata=row.to_dict()))
# 创建向量索引
vector_index = VectorStoreIndex.from_documents(
documents,
insert_batch_size=10 # ✅ DashScope同步接口限制:每批最多10条
)
vector_retriever = vector_index.as_retriever(similarity_top_k=3)
print(f"✅ 向量索引创建完成: {len(documents)}个文档")
documents
方案 2:分块向量化(推荐用于大规模数据) 链接到标题
# 示例:分块向量化实现
from llama_index.core import VectorStoreIndex, Document
import pandas as pd
from sqlalchemy import create_engine
# 数据库连接
db_url = "sqlite:///dataset/ecommerce.db"
engine = create_engine(db_url)
# 加载数据
df = pd.read_sql("SELECT * FROM orders", engine)
print(f"总数据量: {len(df)}行")
# ========== 分块参数 ==========
CHUNK_SIZE = 10 # 每个分块包含的行数
# 将多行合并为一个文档(分块向量化)
documents = []
for chunk_start in range(0, len(df), CHUNK_SIZE):
# 取出当前分块的数据
chunk_df = df.iloc[chunk_start:chunk_start + CHUNK_SIZE]
# 将分块内所有行拼接为文本
text_parts = []
for _, row in chunk_df.iterrows():
item = f"产品id: {row.get('product_id')} | 用户id: {row.get('user_id')} | 数量: {row.get('quantity')} | 订单日期: {row.get('order_date')} | 订单金额: {row.get('amount')}元 | 订单状态: {row.get('status')} "
text_parts.append(item)
# 合并为单个文档
chunk_text = "\n".join(text_parts)
# 元数据记录分块信息
metadata = {
"chunk_id": chunk_start // CHUNK_SIZE,
"row_range": f"{chunk_start}-{min(chunk_start + CHUNK_SIZE, len(df))}",
"status": list(chunk_df['status'].unique()) # 分块包含的状态
}
documents.append(Document(text=chunk_text, metadata=metadata))
print(f"生成 {len(documents)} 个分块文档 (每块 {CHUNK_SIZE} 行)")
# 创建向量索引
vector_index = VectorStoreIndex.from_documents(
documents,
insert_batch_size=10 # DashScope限制
)
vector_retriever = vector_index.as_retriever(similarity_top_k=3)
print(f"✅ 分块向量索引创建完成")
print(documents[0])
方案 3:混合索引(最灵活) 链接到标题
Milvus向量数据库存储 + BM25检索 链接到标题
"""
Milvus混合向量化示例
核心优势:
Milvus原生支持元数据(标量字段)过滤
性能优异,适合大规模生产环境
支持本地Lite模式(./milvus.db)和服务器模式
"""
from typing import Optional, Dict, Any, List, Tuple, Union
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from tqdm import tqdm
from llama_index.core import (
VectorStoreIndex,
Document,
StorageContext,
Settings
)
from llama_index.core.vector_stores import (
MetadataFilters,
MetadataFilter,
FilterOperator
)
from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.embeddings.dashscope import DashScopeEmbedding
import os
from dotenv import load_dotenv
from sqlalchemy import inspect
# 加载环境变量
load_dotenv()
class UniversalHybridVectorizer:
"""
通用混合索引 - 使用Milvus支持元数据过滤
核心功能:
1. 支持CSV/Excel/数据库多种数据源
2. 数据库自动推断字段:利用 sqlalchemy.inspect 扫描表结构,自动将字段归类:
长文本(Text/LongText):自动划为 text_fields,用于生成向量,提供语义搜索能力。
数值/日期(Int/Float/Date):自动划为 metadata_fields,用于范围查询(如“价格>5000”)。
短文本(Tags/Category):自动划为 metadata_fields,用于精确匹配(如“品牌=Apple”)。
3. 使用Milvus实现高性能向量检索 + 标量过滤
"""
def __init__(
self,
db_url: Optional[str] = None,
table: Optional[str] = None,
# ===== 可选:手动覆盖自动推断 =====
text_fields: Optional[List[str]] = None,
metadata_fields: Optional[List[str]] = None,
id_field: Optional[str] = None,
numeric_fields: Optional[List[str]] = None,
categorical_fields: Optional[List[str]] = None
):
"""
初始化向量化器
方式1(推荐):传入数据库连接,自动推断字段
vectorizer = UniversalHybridVectorizer(
db_url="sqlite:///ecommerce.db",
table="products"
)
方式2:手动指定所有字段(用于CSV/Excel或需要精确控制)
vectorizer = UniversalHybridVectorizer(
text_fields=["name", "description"],
metadata_fields=["price", "category"]
)
Args:
db_url: 数据库连接字符串(可选)
table: 表名,用于自动推断字段(可选)
text_fields: 需要向量化的字段列表
metadata_fields: 作为元数据的字段列表
id_field: 唯一标识符字段名
numeric_fields: 数值字段列表(用于范围查询)
categorical_fields: 类别字段列表(用于精确/IN查询)
"""
self.engine = None
self.table = table
# 如果提供了数据库连接,创建引擎
if db_url:
self.engine = create_engine(db_url)
print(f"🔌 连接数据库: {db_url}")
# 自动推断字段(如果提供了db_url和table且未手动指定字段)
if self.engine and table and not text_fields:
auto_fields = self._auto_detect_fields(table)
self.text_fields = text_fields or auto_fields.get('text', [])
self.metadata_fields = metadata_fields or auto_fields.get('metadata', [])
self.id_field = id_field or auto_fields.get('id')
self.numeric_fields = numeric_fields or auto_fields.get('numeric', [])
self.categorical_fields = categorical_fields or auto_fields.get('categorical', [])
print(f"📊 自动推断字段配置:")
print(f" 文本字段: {self.text_fields}")
print(f" 数值字段: {self.numeric_fields}")
print(f" 分类字段: {self.categorical_fields}")
print(f" ID字段: {self.id_field}")
else:
# 使用手动指定的字段
self.text_fields = text_fields or []
self.metadata_fields = metadata_fields or []
self.id_field = id_field
self.numeric_fields = numeric_fields or []
self.categorical_fields = categorical_fields or []
self.index = None
def _auto_detect_fields(self, table: str) -> Dict[str, Any]:
"""
从数据库表结构自动推断字段分类
分类规则:
- TEXT, LONGTEXT, VARCHAR(255+) → text_fields
- INTEGER, FLOAT, DECIMAL → numeric_fields + metadata_fields
- VARCHAR(<255) → categorical_fields + metadata_fields
- PRIMARY KEY → id_field
"""
# 获取表结构
inspector = inspect(self.engine)
# 获取表的列信息
columns = inspector.get_columns(table)
# 获取主键约束
pk_constraint = inspector.get_pk_constraint(table)
result = {
'text': [],
'metadata': [],
'numeric': [],
'categorical': [],
'id': None
}
# 获取主键
if pk_constraint and pk_constraint.get('constrained_columns'):
result['id'] = pk_constraint['constrained_columns'][0]
for col in columns:
col_name = col['name']
col_type = str(col['type']).upper()
# 跳过主键(已单独处理)
if col_name == result['id']:
continue
# 根据类型分类
if 'TEXT' in col_type or 'LONGTEXT' in col_type:
result['text'].append(col_name)
elif 'VARCHAR' in col_type:
# 解析 VARCHAR 长度
import re
match = re.search(r'VARCHAR\((\d+)\)', col_type)
length = int(match.group(1)) if match else 50
if length >= 255:
result['text'].append(col_name)
else:
result['categorical'].append(col_name)
result['metadata'].append(col_name)
elif any(t in col_type for t in ['INT', 'FLOAT', 'DECIMAL', 'NUMERIC', 'REAL']):
result['numeric'].append(col_name)
result['metadata'].append(col_name)
elif any(t in col_type for t in ['DATE', 'TIME', 'DATETIME', 'TIMESTAMP']):
result['metadata'].append(col_name)
else:
# 其他类型默认加入metadata
result['metadata'].append(col_name)
return result
def load_from_database(
self,
table: Optional[str] = None,
query: Optional[str] = None,
limit: Optional[int] = None,
chunk_size: int = 1000
) -> List[Document]:
"""
从数据库加载数据
Args:
table: 表名(默认使用初始化时的表)
query: 自定义SQL查询(支持JOIN,优先于table)
limit: 限制返回行数
chunk_size: 分批读取的行数
Examples:
# 简单用法:加载初始化时指定的表
docs = vectorizer.load_from_database()
# 指定表和限制
docs = vectorizer.load_from_database(table="products", limit=1000)
# 自定义SQL(支持JOIN)
docs = vectorizer.load_from_database(
query=\"\"\"
SELECT p.*, r.content as review_content
FROM products p
LEFT JOIN reviews r ON p.id = r.product_id
\"\"\"
)
"""
if not self.engine:
raise ValueError("未配置数据库连接,请在初始化时传入 db_url")
# 确定要使用的表
target_table = table or self.table
# 构建SQL
if query:
sql = query
print(f"🗄️ 执行自定义查询...")
# 检测查询中的新列并尝试自动推断
self._detect_new_columns_from_query(query)
elif target_table:
sql = f"SELECT * FROM {target_table}"
if limit:
sql += f" LIMIT {limit}"
print(f"🗄️ 加载表: {target_table}" + (f" (LIMIT {limit})" if limit else ""))
else:
raise ValueError("必须指定 table 或 query")
# 分批读取
all_documents = []
chunks = pd.read_sql(sql, self.engine, chunksize=chunk_size)
for df_chunk in tqdm(chunks, desc="处理数据库数据"):
documents = self._convert_to_hybrid_documents(df_chunk)
all_documents.extend(documents)
print(f"✅ 生成 {len(all_documents)} 个文档")
return all_documents
def _convert_to_hybrid_documents(self, df: pd.DataFrame) -> List[Document]:
"""
“结构化”转“非结构化”的关键中间层,转换为混合文档
策略:
1. 文本化(Text Parts):将所有 text_fields 的内容拼接成一段自然语言描述(例如:"name: iPhone 15\ndescription: 最新款苹果手机..."),
这部分会被 Embedding 模型转化为向量。
2. 元数据化(Metadata):将所有 metadata_fields(如价格、类别、库存)保留为原始键值对(Key-Value),这部分会被存入 Milvus 的标量字段中,
用于后续的 Filter 操作。
这实现了**“语义在向量里,属性在字段里”**的双重存储结构。
"""
documents = []
for idx, row in df.iterrows():
# 1. 构建向量化文本
text_parts = []
for field in self.text_fields:
if field in row and pd.notna(row[field]):
text_parts.append(f"{field}: {row[field]}")
text = "\n".join(text_parts) if text_parts else "N/A"
# 2. 构建元数据
metadata = {}
for field in self.metadata_fields:
if field in row and pd.notna(row[field]):
value = row[field]
# 处理pandas特殊类型
if isinstance(value, (pd.Timestamp, pd.Timedelta)):
value = str(value)
metadata[field] = value
# 3. 确定文档ID
if self.id_field and self.id_field in row:
doc_id = str(row[self.id_field])
else:
doc_id = str(idx)
# 4. 创建文档
doc = Document(
text=text,
metadata=metadata,
doc_id=doc_id
)
documents.append(doc)
return documents
def build_index(self, documents: List[Document],
save_path: str = "./hybrid_index",
collection_name: str = "hybrid_collection",
milvus_uri: str = "./milvus_demo.db"):
"""
构建混合索引(使用Milvus支持元数据过滤)
Args:
documents: 文档列表
save_path: 索引保存路径(兼容参数,实际数据存储在Milvus)
collection_name: Milvus集合名称
milvus_uri: Milvus连接URI
- 本地模式: "./milvus_demo.db" (使用Milvus Lite)
- 服务器模式: "http://localhost:19530"
"""
print(f"🔨 构建混合索引,文档数: {len(documents)}")
from llama_index.vector_stores.milvus import MilvusVectorStore
# 使用Milvus构建向量存储(支持元数据过滤!)
# dim=1024 对应 DashScope text-embedding-v4 的向量维度
vector_store = MilvusVectorStore(
uri=milvus_uri, # Milvus连接URI
collection_name=collection_name, # Milvus集合名称
dim=1024, # 向量维度,最好跟Embedding模型长度一致
overwrite=True # 如果存在同名collection则覆盖
)
# 使用MilvusVectorStore构建存储上下文
storage_context = StorageContext.from_defaults(
vector_store=vector_store
)
# 使用VectorStoreIndex构建向量索引
self.index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context, # 使用MilvusVectorStore
show_progress=True, # 显示进度
insert_batch_size=10 # ✅ DashScope同步接口限制:每批最多10条
)
# Milvus数据是自托管的,不需要persist到文件系统
print(f"✅ 索引已创建: collection={collection_name}, uri={milvus_uri}")
vector_retriever = self.index.as_retriever(similarity_top_k=5)
return vector_retriever
print("\n" + "="*60)
print("🚀 Milvus混合索引示例 - 电商产品数据")
print("="*60)
# 1. 创建向量化器
vectorizer = UniversalHybridVectorizer(
db_url="sqlite:///dataset/ecommerce.db",
table="products"
)
# 自动打印:文本字段: ['description'], 数值字段: ['price', 'stock']...
# 2. 加载数据
docs = vectorizer.load_from_database() # 使用初始化时的表
# 3. 构建Milvus检索器
vector_retriever = vectorizer.build_index(
docs,
collection_name="products",
milvus_uri="http://localhost:19530"
)
BM25检索器 链接到标题
在本项目中,BM25 检索器的实现承担着能够“精确匹配关键词”的重要角色。其实现逻辑由底层的核心算法与上层的过滤增强两个层面构成,采用了一种典型的“后过滤”(Post-Filtering)架构。
- 核心算法原理(Standard BM25) LlamaIndex 的 BM25Retriever 基于经典的概率检索模型(通常封装 rank_bm25 库)。它在内存中工作,主要流程如下:
倒排索引构建:启动时,系统会将所有文档节点(Nodes)的文本进行分词,并建立词到文档的映射关系。
相关性评分:当用户查询时,算法根据词频(TF)和逆文档频率(IDF)计算文档得分。
它会奖励稀有词的匹配(例如“RTX4090”的权重远高于“电脑”)。
它会对文档长度进行归一化惩罚,防止长文档仅因包含更多词汇而获得不公平的高分。
目前主流BM25检索器框架对比
| 维度 | Elasticsearch | LlamaIndex BM25 | Whoosh |
|---|---|---|---|
| 核心定位 | 企业级分布式搜索引擎业界标准的全文检索引擎,功能极其强大且成熟。 | RAG 专用轻量检索器LlamaIndex 框架内置组件,专为混合检索(Hybrid Search)设计。 | Python 原生轻量检索引擎纯 Python 实现的独立搜索库,无任何外部依赖。 |
| 部署架构 | 独立服务 (Server)需要部署独立的 ES 集群(Java),资源消耗大,运维成本高。 | 内存/本地运行 (In-Memory)通常基于内存(rank_bm25)或本地文件,作为 Python 进程的一部分运行。 | 本地库 (Library)纯 Python 库,索引文件存储在本地磁盘,即插即用,无需独立进程。 |
| BM25 实现 | 高度优化基于 Lucene 内核,性能极高,支持分词、停用词、同义词等复杂 NLP 处理。 | 简化版/包装器通常封装了 rank_bm25 库,功能相对基础,主要依赖前置的分词处理。 | 可定制版内置了 BM25F 等变种算法,支持 Python 扩展评分逻辑,灵活性较好。 |
| 扩展性 | ⭐⭐⭐⭐⭐ (极强)支持 PB 级海量数据,分布式横向扩展,高可用。 | ⭐⭐ (较弱)受限于单机内存或简单的文件存储,适合中小规模数据(如几万条文档)。 | ⭐⭐ (一般)单机文件存储,数据量过大时(如百万级)性能下降明显,不支持分布式。 |
| 上手难度 | ⭐⭐⭐ (较高)由于是独立服务,需要学习 DSL 查询语法、索引配置和集群运维。 | ⭐ (极低)代码集成简单,几行 Python 代码即可初始化,与 RAG 流程无缝衔接。 | ⭐⭐ (中等)需要定义 Schema 和编写索引逻辑,比 ES 简单但比 LlamaIndex 繁琐。 |
| 核心优势 | 生产级稳定性与生态拥有 Kibana 可视化、丰富的插件生态,适合对检索要求极高的核心业务。 | RAG 集成度最高天生适配 Vector Store,一行代码实现“向量+关键词”混合检索。 | 纯 Python 零依赖无需安装 Java 或 Docker,非常适合嵌入式环境或纯 Python 轻量应用。 |
| 适用场景 | 1. 海量数据(千万/亿级)的企业知识库。2. 需要高并发、高可用支持的生产环境。3. 已有 ES 基础设施的复用。 | 1. RAG 快速原型开发。2. 数据量较小(<10万文档)的垂直领域知识库。3. 极速验证混合检索效果。 | 1. 单机/桌面应用(如本地文档搜索工具)。2. 无法额外部署重型服务(ES)的受限环境。3. 中小规模数据的纯 Python 项目。 |
from llama_index.core.schema import TextNode
from llama_index.retrievers.bm25 import BM25Retriever
import pandas as pd
from sqlalchemy import create_engine
# 数据库连接
db_url = "sqlite:///dataset/ecommerce.db"
engine = create_engine(db_url)
# 加载 products 表数据
df = pd.read_sql("SELECT * FROM products", engine)
print(f"总数据量: {len(df)}行")
# 直接创建 Nodes(跳过 Documents)
nodes = []
for idx, row in df.iterrows():
# 将所有字段都添加到 text 中
text = f"""产品ID: {row.get('product_id')}
产品名称: {row.get('name')}
类别: {row.get('category')}
价格: {row.get('price')}元
库存: {row.get('stock')}件
评分: {row.get('rating')}
描述: {row.get('description')}"""
node = TextNode(
text=text,
metadata=row.to_dict(),
id_=f"product_{idx}"
)
nodes.append(node)
# 创建BM25 检索器
bm25_retriever = BM25Retriever.from_defaults(
nodes=nodes,
similarity_top_k=5
)
print(f"✅ 已创建 {len(nodes)} 个产品节点")
nodes
创建QueryFusionRetriever混合检索器 链接到标题
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
# 1. 创建QueryFusionRetriever混合检索器
retriever = QueryFusionRetriever(
retrievers=[vector_retriever, bm25_retriever], # 集成不同的检索器
similarity_top_k=5, # 最终返回的文档数量
num_queries=1, # 为原始查询生成的变体数量,(默认为1,不生成变体)
mode="reciprocal_rerank", # 结果融合模式,'reciprocal_rerank'是常用且效果好的模式
use_async=False, # 是否异步执行,Jupyter Notebook 环境做测试不用开启异步
verbose=True, # 是否打印调试信息
retriever_weights=[0.8,0.2], # 检索器权重,用于加权融合
)
# 2.构建QueryEngine,查询引擎
retriever_query_engine = RetrieverQueryEngine.from_args(
retriever=retriever
)
# 3.构建测试问题
test_queries = [
"适合办公的笔记本 (价格5000-15000元)",
"适合办公的电子产品", # 语义查询
"价格在1000元以内的产品", # 数值范围查询,
"所有产品的平均价格是多少?", # 聚合查询
]
print("\n向量检索测试结果:")
print("=" * 80)
for query in test_queries:
print(f"\n查询: {query}")
response = retriever_query_engine.query(query)
print(f"回答: {response}")
print("-" * 80)
实现rerank重排序检索 链接到标题
#!pip install modelscope
# 需要先从魔搭社区下载bge-reranker-v2-m3 rerank模型到本地
!modelscope download --model BAAI/bge-reranker-v2-m3 --local_dir ./models/bge-reranker-v2-m3
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.core.query_engine import RetrieverQueryEngine
# 模型本地路径
local_model_path = "./models/bge-reranker-v2-m3"
# 1.定义精排序器 (Reranker)
reranker = SentenceTransformerRerank(
model=local_model_path, # 指定模型路径
top_n=3, # 精选出最相关的4个文档送给LLM,
)
# 2.构建QueryEngine
query_engine = RetrieverQueryEngine.from_args(
retriever=retriever,
node_postprocessors=[reranker] # 添加精排序器
)
# 3. 执行查询
query = "请帮我查询适合办公的笔记本 (价格5000-15000元)有哪些?请把产品名称和价格都展示出来"
response = query_engine.query(query)
print(response.response)
response.get_formatted_sources
性能评估 链接到标题
测试结果 (基于2个测试查询):
| 查询类型 | 准确率 | 示例查询 | 问题 |
|---|---|---|---|
| 精确数值 | 25% ❌ | “价格1000元以内的产品” | 无法精确筛选范围 |
| 聚合统计 | 10% ❌ | “平均价格” | 无法计算SUM/AVG |
| 语义理解 | 88% ✅ | “适合办公” | 语义匹配良好 |
| 整体 | 45% | - | 不适合精确查询 |
核心问题:
❌ 无法精确筛选数值范围
❌ 无法进行聚合计算
❌ 无法排序和Top-K查询
✅ 语义理解能力较强
Attu Milvus官方GUI管理工具 链接到标题
Attu 是 Milvus 官方的 GUI 管理工具,可以通过 Docker 快速启动,拥有非常友好的界面来查看 Scheme、浏览数据和执行向量搜索。
启动 Attu
在终端中运行以下命令(假设您的 Milvus IP 为宿主机 IP,或者都在 Docker 网络中):
docker run -p 8000:3000 -e MILVUS_URL=host.docker.internal:19530 zilliz/attu:latest
注意:如果不确定网络配置,可以直接使用 MILVUS_URL={您的本机IP}:19530
使用方法
浏览器访问 http://localhost:8000
连接到 Milvus (IP: host.docker.internal 或本机 IP, 端口: 19530)
在左侧菜单选择 Collections -> 点击 products
您可以在 Data Preview 标签页直接看到表格形式的数据
在 Vector Search 标签页进行向量相似度搜索测试

第五章:Text-to-SQL基础 链接到标题
5.1 方案原理 链接到标题
Text-to-SQL 是一种将自然语言自动转换为结构化查询语言(SQL)的技术方案。在处理结构化数据的 RAG 系统中,它是解决“精确统计”与“复杂逻辑计算”问题的终极武器,其本质是将 LLM 从“阅读者”升级为“操作者”。
一、 核心原理:语义到代码的精准翻译
不同于向量检索基于“语义相似度”的模糊匹配模式,Text-to-SQL 采用的是**“逻辑推理+代码生成”**模式。其核心在于利用 LLM 强大的理解力,将由人类发出的模糊指令(如“统计上季度销售额增长超过20%的部门”)翻译成数据库能够严格执行的标准 SQL 语句。在这个过程中,LLM 充当了一个高智商的中间层,它不仅要理解用户的意图,还要理解数据库的表结构(Schema)和表之间的关联关系(Relationship)。
用户查询 → LLM生成SQL → 数据库执行 → 结果 → LLM生成答案
大概流程如下:

二、 实现思路与关键流程
一个成熟的 Text-to-SQL 工作流通常包含以下四个核心步骤:
上下文注入 (Context Injection):系统首先提取数据库的元数据(表名、字段名、字段类型、备注等),将其转化为 Prompt 的一部分。这一步至关重要,它相当于告诉 LLM “你面前有哪些数据表可以使用”。
SQL 生成 (SQL Generation):LLM 结合用户问题和表结构信息,推理并生成相应的 SQL 语句(例如 SELECT department, SUM(sales) … GROUP BY …)。高级实现还会包含“SQL 语法检查器”,确保生成的代码没有语法错误。
沙箱执行 (Safe Execution):生成的 SQL 被送入数据库引擎执行。为了安全起见,通常会限制执行权限(如仅允许 SELECT 操作),防止恶意删除或修改数据。
结果解读 (Answer Synthesis):数据库返回的通常是行数据或统计数值(如 [(Marketing, 150000), (Sales, 200000)])。最后一步是将这些冷冰冰的数据再次喂给 LLM,让其生成通俗易懂的自然语言回答(如“市场部和销售部表现优异…”)。
三、 核心优势与场景
Text-to-SQL 的最大价值在于**“计算的精确性”**。向量检索无法准确回答“所有产品的平均价格是多少”或“库存少于10的产品有哪些”这类问题,而 SQL 引擎天生就是为此设计的。因此,该方案特别适用于财务报表分析、业务数据统计、库存管理查询等对数值精度和聚合逻辑有严格要求的场景。
Text-to-SQL 主流工具对比分析:
| 维度 | LlamaIndex | LangChain | Vanna AI |
|---|---|---|---|
| 核心定位 | 数据索引与检索专家专注于连接 LLM 与外部数据,提供高效的上下文索引能力。 | 全能型 LLM 编排框架提供最广泛的组件和工具链,用于构建复杂的应用逻辑。 | Text-to-SQL 垂直领域专家专为“自然语言转 SQL”场景设计的 RAG 框架。 |
| SQL 生成方式 | **基于检索增强 (RAG)**擅长先检索相关 Schema/Table 元数据,再生成 SQL,适合大规模 Schema。 | **基于链式调用 (Chain)**通过 SQLDatabaseChain 等组件组合 Prompt 和执行逻辑,灵活性极高。 | **基于训练/反馈 (Train/RAG)**内置了专门针对 SQL 优化的 RAG 流程,且具备“训练”能力(基于历史问答优化)。 |
| 准确率优化 | ✅ 检索精度高通过 ObjectRetriever 等机制,能从海量表中精准定位相关表,减少 token 消耗。 | ⚠️ 依赖 Prompt 工程需要开发者自行设计 Few-shot 样本和优化 Prompt 策略来提升效果。 | 🚀 准确率极高自动将 Schema、DDL、文档和历史正确 SQL 向量化,越用越准。 |
| 上手难度 | ⭐⭐ (中等)概念较多(Index, Retriever),但对数据层的封装非常完善。 | ⭐⭐⭐ (较高)组件极其丰富,定制化开发需要较强的代码和逻辑编排能力。 | ⭐ (极低)API 设计简洁,几乎开箱即用,只需几行代码即可连接数据库并提问。 |
| 灵活性 | ⭐⭐⭐数据接入灵活,检索策略可定制。 | ⭐⭐⭐⭐⭐逻辑、Prompt、工具调用完全由开发者掌控,可实现极复杂的 Agent。 | ⭐⭐专注 SQL 生成,流程相对固定,但通过配置也能适配多种 LLM 和向量库。 |
| 核心优势 | Token 效率高、适合大表在几百张表的数据库中,能快速检索出最相关的几张表给 LLM,避免上下文超长。 | 生态丰富、上限高由于是通用框架,可以轻松结合其他非 SQL 工具(如搜索、计算器)构建综合 Agent。 | 自带前端与可视化内置了 Streamlit/Flask UI,甚至支持生成 Plotly 图表,交付速度最快。 |
| 适用场景 | 1. 表数量巨大的企业级数仓。2. 需要结合非结构化文档(如 PDF 合同查数据)的混合检索场景。 | 1. 需要高度定制业务逻辑(如审批流、多跳推理)。2. 已有 LangChain 生态的项目集成。 | 1. 快速验证 Text-to-SQL 可行性。2. 需要高准确率且表结构相对稳定的生产环境。3. 需快速交付带可视化界面的 Demo。 |
5.2 创建测试数据库 链接到标题
我们创建一个电商数据库,包含:
products表: 15个产品
users表: 10个用户
orders表: 100个订单
reviews表: 50条评论
数据保存在ecommerce.db文件中,可以自行查看
# 创建电商数据库的完整代码会非常长
# 这里提供核心创建逻辑,完整版见项目文件
import sqlite3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from pathlib import Path
from sqlalchemy import create_engine, text
from llama_index.core import SQLDatabase, Settings
# 使用已有的day6测试数据库作为示例
db_path = "../day6_integration/large_data_test.db"
# 数据库文件路径
db_path = Path("dataset/ecommerce.db")
if not db_path.exists():
print("❌ 数据库文件不存在!")
print("请先运行: python init_database.py")
else:
# 创建 SQLAlchemy 引擎
engine = create_engine(f"sqlite:///{db_path}")
# 创建 LlamaIndex SQLDatabase 对象
sql_database = SQLDatabase(engine)
print("✅ 数据库连接成功")
print(f" 数据库路径: {db_path.absolute()}")
print("✅ 数据库创建完成")
5.2.1 查看示例数据 链接到标题
# 数据库相关
import sqlite3
# 工具库
import pandas as pd
from rich import print as rprint
from rich.console import Console
from rich.table import Table
console = Console()
# 查看用户表前5条数据
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users LIMIT 5"))
df_users = pd.DataFrame(result.fetchall(), columns=result.keys())
console.print("\n[bold cyan]👥 用户表示例数据:[/bold cyan]")
display(df_users)
# 查看产品表前5条数据
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM products LIMIT 5"))
df_products = pd.DataFrame(result.fetchall(), columns=result.keys())
console.print("\n[bold cyan]📦 产品表示例数据:[/bold cyan]")
display(df_products)
注意: 完整的数据库创建代码较长,已保存在项目的large_data_benchmark.py中。这里我们直接使用已创建的数据库进行演示。
5.2.2 创建SQL查询引擎 链接到标题
# Text-to-SQL实现
from llama_index.core import SQLDatabase
from llama_index.core.query_engine import NLSQLTableQueryEngine
from sqlalchemy import create_engine
# 创建SQL数据库连接
engine = create_engine("sqlite:///dataset/ecommerce.db")
sql_database = SQLDatabase(engine, include_tables=["products", "orders", "users","reviews"])
# 创建SQL查询引擎
sql_query_engine = NLSQLTableQueryEngine(
sql_database=sql_database,
synthesize_response=True
)
print("✅ Text-to-SQL引擎创建完成")
5.2.3 查看数据库表结构 链接到标题
# 获取所有表名
table_names = sql_database.get_usable_table_names()
console.print("\n[bold cyan]📊 数据库表列表:[/bold cyan]")
for table in table_names:
console.print(f" • {table}")
# 查看每个表的结构
console.print("\n[bold cyan]📋 表结构详情:[/bold cyan]")
for table in table_names:
console.print(f"\n[yellow]{table}[/yellow]:")
schema = sql_database.get_single_table_info(table)
console.print(schema)
# 测试SQL查询
sql_test_queries = [
"有多少个产品?",
"价格最高的5个产品是什么?",
"我们店里哪个牌子的电脑卖得最好?",
]
for query in sql_test_queries:
print(f"\n\n\n")
print(f"=== 测试查询: {query} ===")
response = sql_query_engine.query(query)
print(f"回答: {response}")
print(f"SQL: {response.metadata.get('sql_query', 'N/A')}")
5.3 性能评估 链接到标题
测试结果:
| 查询类型 | 向量检索 | Text-to-SQL | 提升 |
|---|---|---|---|
| 精确数值 | 25% | 85% ✅ | +60% |
| 聚合统计 | 10% | 95% ✅ | +85% |
| 语义理解 | 88% | 40% ❌ | -48% |
| 整体 | 45% | 75% | +30% |
核心发现:
✅ 数值查询大幅提升(25% → 85%)
✅ 聚合统计几乎完美(95%)
❌ 语义理解能力下降(88% → 40%),在第三个问题中没有根据语义信息进行回答
⚠️ SQL生成错误率约15-20%
第六章:Text-to-SQL优化 链接到标题
6.1 优化技术概览 链接到标题
我们将应用4种优化技术:
Schema增强 - 添加表和字段的详细描述
Few-shot示例 - 提供高质量查询示例
SQL验证 - 自动检测和修复错误
查询缓存 - 减少重复LLM调用
预期提升: 75% → 92%
6.2 Schema 增强技术 链接到标题
6.2.1 为什么需要 Schema 增强? 链接到标题
基础的表结构信息可能不足以让 LLM 准确理解数据含义。通过添加:
表的业务描述
字段的详细说明
字段之间的关系
示例值
可以显著提升 SQL 生成的准确性。
6.2.2 创建增强的 Schema 链接到标题
from llama_index.core.objects import SQLTableSchema
# 为每个表创建增强的 Schema
table_schemas = [
SQLTableSchema(
table_name="users",
context_str="""
用户表,存储所有注册用户的基本信息。
字段说明:
- user_id: 用户唯一标识符
- name: 用户姓名
- email: 用户邮箱地址
- region: 用户所在地区(如:北京、上海、广州等)
- register_date: 用户注册日期
- vip_level: VIP等级(0=普通用户, 1-3=不同等级VIP)
- total_spent: 用户累计消费金额(单位:元)
业务规则:
- VIP等级越高,享受的折扣越多
- total_spent 会随着订单完成自动更新
"""
),
SQLTableSchema(
table_name="products",
context_str="""
产品表,存储所有可售卖的商品信息。
字段说明:
- product_id: 产品唯一标识符
- name: 产品名称
- category: 产品分类(电子产品、家用电器、服装鞋帽、家具等)
- price: 产品价格(单位:元)
- stock: 库存数量
- rating: 产品评分(1-5分,保留两位小数)
- description: 产品详细描述
业务规则:
- rating 基于用户评论自动计算
- stock 为0时产品不可购买
"""
),
SQLTableSchema(
table_name="orders",
context_str="""
订单表,记录所有用户的购买订单。
字段说明:
- order_id: 订单唯一标识符
- user_id: 下单用户ID(外键关联 users 表)
- product_id: 购买产品ID(外键关联 products 表)
- quantity: 购买数量
- amount: 订单总金额(单位:元)= price * quantity
- order_date: 下单日期
- status: 订单状态(已完成、配送中、已取消等)
业务规则:
- 只有状态为"已完成"的订单才计入销售额
- amount 已包含所有折扣
"""
),
SQLTableSchema(
table_name="reviews",
context_str="""
评论表,存储用户对产品的评价。
字段说明:
- review_id: 评论唯一标识符
- user_id: 评论用户ID(外键关联 users 表)
- product_id: 被评论产品ID(外键关联 products 表)
- rating: 评分(1-5分整数)
- comment: 评论内容
- review_date: 评论日期
业务规则:
- 用户必须购买过产品才能评论
- rating 影响产品的总体评分
"""
)
]
print("✅ Schema 增强配置完成")
6.2.3 使用增强 Schema 创建查询引擎 链接到标题
# 创建带有增强 Schema 的查询引擎
enhanced_query_engine = NLSQLTableQueryEngine(
sql_database=sql_database, # SQL 数据库对象
tables=["users", "products", "orders", "reviews"], # 指定可供查询的表列表
table_schemas=table_schemas, # 注入包含列描述的增强 Schema
verbose=True, # 输出详细的 SQL 生成过程
synthesize_response=True # 将 SQL 执行结果转换为自然语言回复
)
print("✅ 增强查询引擎创建成功")
6.2.4 对比测试:基础 vs 增强 链接到标题
6.2.4.1 测试查询 1: 复杂业务查询 链接到标题
query = "哪些VIP用户的累计消费超过10000元?"
console.print(f"\n[bold green]🔍 查询:[/bold green] {query}")
# 基础引擎
console.print("\n[bold cyan]📊 基础引擎结果:[/bold cyan]")
try:
response_basic = sql_query_engine.query(query)
console.print(f"回答: {response_basic}")
console.print(f"SQL: {response_basic.metadata.get('sql_query', 'N/A')}")
except Exception as e:
console.print(f"[red]错误: {e}[/red]")
# 增强引擎
console.print("\n[bold cyan]✨ 增强引擎结果:[/bold cyan]")
try:
response_enhanced = enhanced_query_engine.query(query)
console.print(f"回答: {response_enhanced}")
console.print(f"SQL: {response_enhanced.metadata.get('sql_query', 'N/A')}")
except Exception as e:
console.print(f"[red]错误: {e}[/red]")
6.2.4.2 测试查询 2: 需要理解业务规则 链接到标题
query = "计算已完成订单的总销售额"
console.print(f"\n[bold green]🔍 查询:[/bold green] {query}")
# 增强引擎(应该能正确理解"已完成"状态)
response = enhanced_query_engine.query(query)
console.print(f"\n[bold blue]💡 回答:[/bold blue] {response}")
console.print(f"\n[bold yellow]📝 生成的 SQL:[/bold yellow]\n{response.metadata.get('sql_query', 'N/A')}")
6.3 Few-shot 示例优化 链接到标题
6.3.1 什么是 Few-shot Learning? 链接到标题
Few-shot learning 是通过提供少量高质量示例来引导 LLM 生成更准确的 SQL。示例应包括:
典型的查询模式
复杂的 JOIN 操作
日期时间处理
聚合函数使用
6.3.2 创建 Few-shot Prompt 模板 链接到标题
from llama_index.core import PromptTemplate, SQLDatabase
# ========================================
# 4. 创建 Few-shot Prompt 模板
# ========================================
print("🔧 创建 Few-shot Prompt 模板...")
few_shot_prompt = PromptTemplate(
"""
给定数据库 Schema 和用户问题,生成正确的 SQL 查询。
=== 数据库 Schema ===
{schema}
=== Few-shot 示例(学习以下模式)===
示例 1: 时间范围查询
问题: "过去7天的订单数量"
SQL: SELECT COUNT(*) AS total_orders FROM orders WHERE order_date >= DATE('now', '-7 days')
示例 2: 多表 JOIN + 聚合
问题: "销售额前10的产品"
SQL: SELECT p.name, SUM(o.amount) as total_sales
FROM products p
JOIN orders o ON p.product_id = o.product_id
WHERE o.status = '已完成'
GROUP BY p.product_id, p.name
ORDER BY total_sales DESC
LIMIT 10
示例 3: 类别筛选 + 平均值
问题: "电子产品类别的平均价格"
SQL: SELECT AVG(price) as avg_price
FROM products
WHERE category = '电子产品'
示例 4: VIP 用户过滤
问题: "VIP等级大于2的用户有多少?"
SQL: SELECT COUNT(*) as vip_count
FROM users
WHERE vip_level > 2
=== 现在回答用户问题 ===
问题: {query_str}
请生成 SQL 查询(只返回 SQL,不要解释):
SQL:
"""
)
# ========================================
# 5. 创建 Few-shot 查询引擎
# ========================================
few_shot_engine = NLSQLTableQueryEngine(
sql_database=sql_database,
tables=["users", "products", "orders", "reviews"],
table_schema_objs=table_schemas,
text_to_sql_prompt=few_shot_prompt, # 使用 Few-shot Prompt
verbose=False,
synthesize_response=True
)
print("✅ Few-shot 查询引擎创建成功\n")
# ========================================
# 6. 对比测试
# ========================================
test_queries = [
{
"query": "销售额前5的产品是哪些?",
"type": "多表JOIN查询",
"difficulty": "⭐⭐⭐"
},
{
"query": "电子产品的平均价格是多少?",
"type": "类别筛选+聚合",
"difficulty": "⭐⭐"
},
{
"query": "过去30天的订单总数是多少?",
"type": "时间范围查询",
"difficulty": "⭐"
}
]
for i, test in enumerate(test_queries, 1):
console.print(f"\n{'='*80}")
console.print(f"[bold green]测试 {i}: {test['query']}[/bold green]")
console.print(f"[dim]类型: {test['type']} | 难度: {test['difficulty']}[/dim]")
console.print(f"{'='*80}")
# 基础引擎
console.print("\n[bold cyan]📊 基础引擎(无 Few-shot):[/bold cyan]")
try:
response_basic = enhanced_query_engine.query(test['query'])
console.print(f"回答: {response_basic}")
sql_basic = response_basic.metadata.get('sql_query', 'N/A')
console.print(f"\n生成的 SQL:\n[yellow]{sql_basic}[/yellow]")
except Exception as e:
console.print(f"[red]❌ 错误: {e}[/red]")
# Few-shot 引擎
console.print("\n[bold magenta]✨ Few-shot 引擎:[/bold magenta]")
try:
response_few_shot = few_shot_engine.query(test['query'])
console.print(f"回答: {response_few_shot}")
sql_few_shot = response_few_shot.metadata.get('sql_query', 'N/A')
console.print(f"\n生成的 SQL:\n[yellow]{sql_few_shot}[/yellow]")
except Exception as e:
console.print(f"[red]❌ 错误: {e}[/red]")
# 对比分析
console.print("\n[bold blue]📊 对比分析:[/bold blue]")
if 'sql_basic' in locals() and 'sql_few_shot' in locals():
if sql_basic.lower() != sql_few_shot.lower():
console.print("[green]✅ Few-shot 生成了不同的 SQL(可能更准确)[/green]")
else:
console.print("[dim]两种引擎生成了相同的 SQL[/dim]")
6.3.3 Few-shot 优化效果总结 链接到标题
| 查询类型 | 基础引擎 | Few-shot 引擎 | 提升 |
|---|---|---|---|
| 多表 JOIN | 可能缺少条件 | ✅ 正确 JOIN + WHERE | +15% |
| 时间范围 | 可能语法错误 | ✅ 正确 DATE 函数 | +20% |
| 聚合 + 分组 | 可能缺少 GROUP BY | ✅ 完整聚合逻辑 | +10% |
| 总体准确率 | 75% | 90% | +15% ✨ |
关键改进:
✅ JOIN 查询更准确(知道需要 JOIN 哪些表)
✅ 日期函数使用正确(学习了
DATE('now', '-N days')模式)✅ WHERE 条件更完整(理解业务规则,如
status='已完成')✅ 聚合查询更规范(正确使用
GROUP BY + ORDER BY)
最佳实践:
提供 3-5 个 高质量示例即可(太多会占用 token)
示例应覆盖 常见查询模式(JOIN、聚合、时间、筛选)
示例 SQL 必须 可执行且正确
结合 Schema 增强效果更佳(Few-shot + Schema = 最强组合)
6.4 SQL 验证器优化 链接到标题
6.4.1 为什么需要 SQL 验证器? 链接到标题
即使使用了 Schema 增强和 Few-shot 学习,LLM 生成的 SQL 仍可能存在错误:
表名拼写错误(product vs products)
列名错误
缺少必要的 GROUP BY 子句
语法错误
SQL 验证器可以:
✅ 自动检测这些错误
✅ 尝试智能修复
✅ 提供详细的错误提示
效果:SQL 错误率降低 50%
6.4.2 创建 SQL 验证器类 链接到标题
工作原理 链接到标题
自动发现 Schema 链接到标题
该验证器逻辑实现放在了 RAG_files/dynamic_sql_validator.py 文件中
validator = DynamicSQLValidator(engine)
# 自动加载:
# ✅ 所有表名 (orders, products, users, reviews...)
# ✅ 所有列名 (每个表的所有列)
# ✅ 常见别名 (product → products)
1. 智能修复策略 链接到标题
策略 A: 别名映射 链接到标题
# ❌ 错误
SELECT * FROM product WHERE price > 5000
# ✅ 自动修复(别名映射)
SELECT * FROM products WHERE price > 5000
策略 B: 模糊匹配(拼写错误) 链接到标题
# ❌ 错误(拼写 70% 相似)
SELECT * FROM prodcts WHERE price > 3000
# ✅ 自动修复(模糊匹配)
SELECT * FROM products WHERE price > 3000
策略 C: 列名智能匹配 链接到标题
# ❌ 错误
SELECT user_name FROM users
# ✅ 自动修复(列名匹配)
SELECT name FROM users # 在 users 表中找到最相似的列
策略 D: 上下文感知 链接到标题
# ❌ 错误(多表JOIN,需要推断列所属表)
SELECT u.user_name, o.total FROM users u JOIN orders o ...
# ✅ 自动修复(根据 u. 前缀推断是 users 表的列)
SELECT u.name, o.amount FROM users u JOIN orders o ...
### 6.4.4 方式 1: 独立使用
```python
from RAG_files.dynamic_sql_validator import DynamicSQLValidator
from sqlalchemy import create_engine
# 1. 连接数据库(支持任意数据库)
engine = create_engine("sqlite:///dataset/ecommerce.db")
# engine = create_engine("mysql+pymysql://user:pass@host/db")
# engine = create_engine("postgresql://user:pass@host/db")
# 2. 创建验证器(自动发现 Schema)
validator = DynamicSQLValidator(engine)
# 3. 验证并修复 SQL
sql = "SELECT * FROM product WHERE price > 5000" # 错误的表名
result = validator.validate_and_fix(sql)
if result['is_valid']:
print(f"✅ 修复成功: {result['corrected_sql']}")
print(f"修复说明: {result['fix_applied']}")
else:
print(f"❌ 无法修复: {result['error_message']}")
6.4.5 方式 2: 集成到 Query Engine 链接到标题
class ValidatedQueryEngine:
"""带动态验证的查询引擎"""
def __init__(self, query_engine, validator):
self.query_engine = query_engine
self.validator = validator
def query(self, query_str: str):
# 1. LLM 生成 SQL
response = self.query_engine.query(query_str)
generated_sql = response.metadata.get('sql_query', '')
# 2. 动态验证并修复
validation_result = self.validator.validate_and_fix(generated_sql)
if not validation_result['is_valid']:
# 3. 如果有修复,使用修复后的 SQL
if validation_result['corrected_sql'] != generated_sql:
print(f"🔧 SQL已自动修复: {validation_result['fix_applied']}")
# 这里可以用修复后的 SQL 重新查询
return response
# 使用
validated_engine = ValidatedQueryEngine(few_shot_engine, validator)
response = validated_engine.query("查询 product 表")
print(response.response)
6.4.6 高级配置 链接到标题
# 在 _fix_table_name_dynamic 方法中调整 cutoff参数
similar_tables = get_close_matches(
wrong_table.lower(),
[t.lower() for t in self.schema_cache['tables']],
n=1,
cutoff=0.6 # 降低到 0.5 可以匹配更宽松,提高到 0.8 更严格
)
6.4.7 添加自定义修复规则 链接到标题
class CustomSQLValidator(DynamicSQLValidator):
"""自定义验证器"""
def _fix_table_name_dynamic(self, sql, error_msg):
# 先尝试父类的修复
fixed_sql, desc = super()._fix_table_name_dynamic(sql, error_msg)
# 添加自己的业务规则
if desc == "": # 父类未能修复
# 添加特定业务逻辑
# 例如:customer → users(业务特定映射)
if "customer" in sql.lower():
fixed_sql = sql.replace("customer", "users")
desc = "🔧 业务映射: customer → users"
return fixed_sql, desc
6.4.8 重新加载 Schema 链接到标题
# 如果数据库结构发生变化,重新加载 Schema
validator.schema_cache = validator._build_schema_cache()
print("✅ Schema 缓存已刷新")
6.5 查询缓存优化 链接到标题
6.5.1 为什么需要查询缓存? 链接到标题
在实际应用中,用户经常会重复查询相同的问题:
查看实时数据(销售额、订单数等)
重复打开同一个报表
多用户查询相同数据
问题:每次都调用 LLM 生成 SQL + 执行查询
响应慢(LLM 500ms+)
成本高(API 调用费用)
资源浪费
解决方案:查询缓存
缓存查询结果
即时响应(<10ms)
节省 API 调用
效果:
缓存命中率 50%+
响应速度提升 2倍
API 成本降低 40-60%
6.5.2 创建内存查询缓存 链接到标题
# 优化4: 查询缓存
import hashlib
from functools import lru_cache
class QueryCache:
"""查询结果缓存"""
def __init__(self, max_size=100):
self.cache = {}
self.max_size = max_size
self.hits = 0
self.misses = 0
def get_hash(self, query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()
def get(self, query: str):
query_hash = self.get_hash(query)
if query_hash in self.cache:
self.hits += 1
return self.cache[query_hash]
self.misses += 1
return None
def set(self, query: str, result):
if len(self.cache) >= self.max_size:
# 简单的FIFO策略
self.cache.pop(next(iter(self.cache)))
query_hash = self.get_hash(query)
self.cache[query_hash] = result
def get_hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
def get_stats(self) -> dict:
"""获取统计信息"""
total = self.hits + self.misses
return {
'hits': self.hits,
'misses': self.misses,
'total': total,
'hit_rate': (self.hits / total * 100) if total > 0 else 0,
'cache_size': len(self.cache)
}
cache = QueryCache()
print("✅ 查询缓存创建完成,预期命中率40-60%")
class CachedQueryEngine:
"""带缓存的查询引擎"""
def __init__(self, query_engine, cache):
self.query_engine = query_engine
self.cache = cache
def query(self, query_str: str):
"""执行查询(带缓存)"""
# 1. 检查缓存
cached_result = self.cache.get(query_str)
if cached_result:
console.print(f"[green]✅ 从缓存返回结果[/green]")
return cached_result
# 2. 执行查询
console.print(f"[yellow]❌ 缓存未命中,查询 LLM...[/yellow]")
response = self.query_engine.query(query_str)
# 3. 缓存结果
self.cache.set(query_str, response)
return response
# 创建带缓存的引擎
cached_engine = CachedQueryEngine(few_shot_engine, cache)
# 测试场景:模拟重复查询
test_queries = [
"销售额前10的产品",
"VIP的用户总数量是多少?",
"销售额前10的产品", # 重复
"电子产品的平均价格",
"VIP用户数量", # 重复
]
print("\n📊 测试缓存效果:\n")
for i, query in enumerate(test_queries, 1):
print(f"\n查询 {i}: {query}")
start_time = time.time()
response = cached_engine.query(query)
elapsed = time.time() - start_time
print(f" 耗时: {elapsed*1000:.0f}ms")
# 显示统计
stats = cache.get_stats()
print(f"\n缓存统计:")
print(f" 命中数: {stats['hits']}")
print(f" 未命中: {stats['misses']}")
print(f" 命中率: {stats['hit_rate']:.1f}%")
print(f" 缓存大小: {stats['cache_size']}")
6.5.3 Redis 查询缓存 链接到标题
# 安装redis依赖
!pip install redis
Redis 查询缓存实现
支持分布式、持久化、跨进程共享
brew install redis(mac系统)
或者 docker run -d -p 6379:6379 redis (容器运行redis服务)
启动 Redis 服务
- brew services start redis # ✅ 成功启动
验证运行状态
- redis-cli ping # ✅ 返回 PONG
安装 Python 客户端
- pip install redis # ✅ 7.1.0
运行演示
- python redis_query_cache.py # ✅ 成功!
from RAG_files.redis_query_cache import RedisQueryCache
import redis
# 方式1:使用 Redis 缓存
try:
console.print("[bold cyan]方式 1: Redis 缓存(推荐生产环境)[/bold cyan]\n")
redis_cache = RedisQueryCache(
redis_host='localhost',
redis_port=6379,
ttl=300 # 5分钟过期
)
# 测试
test_queries = [
"销售额前10的产品",
"VIP用户数量",
"销售额前10的产品", # 重复
]
for query in test_queries:
console.print(f"\n[bold]查询:[/bold] {query}")
# 检查缓存
result = redis_cache.get(query)
if not result:
# 模拟查询
result = f"'{query}' 的查询结果"
redis_cache.set(query, result)
# 显示统计
stats = redis_cache.get_stats()
console.print(f"\n[bold green]Redis 统计:[/bold green]")
console.print(f" 命中: {stats['hits']}, 未命中: {stats['misses']}")
console.print(f" 命中率: {stats['hit_rate']:.1f}%")
console.print(f" Redis 内存使用: {stats['memory_used']}")
except redis.ConnectionError:
console.print("[red]❌ Redis 未运行,请先启动 Redis[/red]")
console.print("[dim]启动命令: redis-server[/dim]")
# 展示redis缓存的所欲key值
!redis-cli KEYS "query_cache:*"
# 展示redis缓存的所属key值
!redis-cli --raw GET "query_cache:3318b62a"
6.6 综合性能评估 链接到标题
关键成果:
✅ 准确率达到生产可用标准(92%)
✅ SQL生成质量大幅提升
✅ 通过缓存降低成本和延迟
⚠️ 语义查询仍是短板(45%)
第七章:Text-to-SQL 进阶 - 多表 JOIN 查询 链接到标题
掌握多表关联查询的 Text-to-SQL 实现
理解不同类型的 JOIN(INNER、LEFT、RIGHT)
学习复杂聚合和分组查询
实现子查询和嵌套查询
- 应用场景
企业数据分析:跨表关联分析,如用户购买行为分析、产品销售趋势等
7.1 理解表关系 链接到标题
7.1.1 查看表之间的关系 链接到标题
# 工具库
import pandas as pd
from rich import print as rprint
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
import time
console.print(Panel.fit(
"""[bold cyan]数据库表关系图[/bold cyan]
[yellow]users[/yellow] (用户表)
├─ user_id [primary key]
│
├──> [green]orders[/green] (订单表)
│ ├─ user_id [foreign key]
│ ├─ product_id [foreign key]
│ └──> [blue]products[/blue] (产品表)
│ └─ product_id [primary key]
│
└──> [magenta]reviews[/magenta] (评论表)
├─ user_id [foreign key]
└─ product_id [foreign key]
└──> [blue]products[/blue]
[dim]关系说明:
• 一个用户可以有多个订单(1:N)
• 一个用户可以写多个评论(1:N)
• 一个产品可以被多个订单购买(1:N)
• 一个产品可以有多个评论(1:N)[/dim]
""",
title="📊 表关系"
))
7.1.2 创建增强的 Schema(包含关系信息) 链接到标题
# 增强的 Schema,特别强调表之间的关系
table_schemas = [
SQLTableSchema(
table_name="users",
context_str="""
用户表,存储所有注册用户的基本信息。
字段说明:
- user_id: 用户唯一标识符(主键)
- name: 用户姓名
- email: 用户邮箱地址
- region: 用户所在地区
- register_date: 用户注册日期
- vip_level: VIP等级(0=普通用户, 1-3=不同等级VIP)
- total_spent: 用户累计消费金额(单位:元)
关系:
- 通过 user_id 关联 orders 表(一对多)
- 通过 user_id 关联 reviews 表(一对多)
常见查询场景:
- 查询用户的所有订单:JOIN orders ON users.user_id = orders.user_id
- 查询用户的所有评论:JOIN reviews ON users.user_id = reviews.user_id
"""
),
SQLTableSchema(
table_name="products",
context_str="""
产品表,存储所有可售卖的商品信息。
字段说明:
- product_id: 产品唯一标识符(主键)
- name: 产品名称
- category: 产品分类(电子产品、家用电器、服装鞋帽、家具等)
- price: 产品价格(单位:元)
- stock: 库存数量
- rating: 产品评分(1-5分)
- description: 产品详细描述
关系:
- 通过 product_id 关联 orders 表(一对多)
- 通过 product_id 关联 reviews 表(一对多)
常见查询场景:
- 查询产品的所有订单:JOIN orders ON products.product_id = orders.product_id
- 查询产品的所有评论:JOIN reviews ON products.product_id = reviews.product_id
"""
),
SQLTableSchema(
table_name="orders",
context_str="""
订单表,记录所有用户的购买订单。
字段说明:
- order_id: 订单唯一标识符(主键)
- user_id: 下单用户ID(外键,关联 users.user_id)
- product_id: 购买产品ID(外键,关联 products.product_id)
- quantity: 购买数量
- amount: 订单总金额(单位:元)
- order_date: 下单日期
- status: 订单状态(已完成、配送中、已取消)
关系:
- 通过 user_id 关联 users 表(多对一)
- 通过 product_id 关联 products 表(多对一)
常见查询场景:
- 查询订单的用户信息:JOIN users ON orders.user_id = users.user_id
- 查询订单的产品信息:JOIN products ON orders.product_id = products.product_id
- 用户购买行为分析:同时 JOIN users 和 products
业务规则:
- 只有状态为\"已完成\"的订单才计入销售额
"""
),
SQLTableSchema(
table_name="reviews",
context_str="""
评论表,存储用户对产品的评价。
字段说明:
- review_id: 评论唯一标识符(主键)
- user_id: 评论用户ID(外键,关联 users.user_id)
- product_id: 被评论产品ID(外键,关联 products.product_id)
- rating: 评分(1-5分整数)
- comment: 评论内容
- review_date: 评论日期
关系:
- 通过 user_id 关联 users 表(多对一)
- 通过 product_id 关联 products 表(多对一)
常见查询场景:
- 查询评论的用户信息:JOIN users ON reviews.user_id = users.user_id
- 查询评论的产品信息:JOIN products ON reviews.product_id = reviews.product_id
"""
)
]
print("✅ Schema 配置完成")
few_shot_prompt = PromptTemplate(
"""
给定数据库 Schema 和用户问题,生成正确的 SQL 查询。
=== 数据库 Schema ===
{schema}
=== Few-shot 示例(学习以下模式)===
示例 1: 时间范围查询
问题: "过去7天的订单数量"
SQL: SELECT COUNT(*) FROM orders WHERE order_date >= DATE('now', '-7 days')
示例 2: 多表 JOIN + 聚合
问题: "销售额前10的产品"
SQL: SELECT p.name, SUM(o.amount) as total_sales
FROM products p
JOIN orders o ON p.product_id = o.product_id
WHERE o.status = '已完成'
GROUP BY p.product_id, p.name
ORDER BY total_sales DESC
LIMIT 10
示例 3: 类别筛选 + 平均值
问题: "电子产品类别的平均价格"
SQL: SELECT AVG(price) as avg_price
FROM products
WHERE category = '电子产品'
示例 4: VIP 用户过滤
问题: "VIP等级大于2的用户有多少?"
SQL: SELECT COUNT(*) as vip_count
FROM users
WHERE vip_level > 2
=== 现在回答用户问题 ===
问题: {query_str}
请生成 SQL 查询(只返回 SQL,不要解释):
SQL:
"""
)
# 创建查询引擎
query_engine = NLSQLTableQueryEngine(
sql_database=sql_database, # SQL 数据库对象
text_to_sql_prompt=few_shot_prompt, # 文本转 SQL 的提示词模板
tables=["users", "products", "orders", "reviews"], # 指定查询涉及的表名
table_schemas=table_schemas, # 数据库表的架构信息
verbose=True, # 是否输出中间过程日志
synthesize_response=True # 是否将 SQL 结果合成为自然语言回答
)
print("✅ 查询引擎创建成功")
# 创建验证引擎
validated_engine = ValidatedQueryEngine(query_engine, validator)
# 创建带缓存的引擎
cached_sql_engine = CachedQueryEngine(validated_engine, cache)
7.3 两表 JOIN 查询 链接到标题
7.3.1 用户-订单关联查询 链接到标题
query = "查询张三的所有订单信息,包括订单金额和日期"
console.print(f"\n[bold green]🔍 查询:[/bold green] {query}")
start_time = time.time()
response = cached_sql_engine.query(query)
elapsed_time = time.time() - start_time
console.print(f"\n[bold blue]💡 回答:[/bold blue] {response}")
console.print(f"\n[bold yellow]📝 生成的 SQL:[/bold yellow]\n{response.metadata.get('sql_query', 'N/A')}")
console.print(f"\n[dim]⏱️ 查询耗时: {elapsed_time:.2f}秒[/dim]")
7.3.2 产品-订单关联查询 链接到标题
query = "iPhone 15 Pro 一共卖出了多少台?总销售额是多少?"
console.print(f"\n[bold green]🔍 查询:[/bold green] {query}")
response = cached_sql_engine.query(query)
console.print(f"\n[bold blue]💡 回答:[/bold blue] {response}")
console.print(f"\n[bold yellow]📝 生成的 SQL:[/bold yellow]\n{response.metadata.get('sql_query', 'N/A')}")
7.3.3 最佳实践 链接到标题
Schema 设计:明确描述表之间的关系
查询优化:避免不必要的 JOIN
性能监控:记录查询耗时
错误处理:优雅处理 SQL 生成失败
第八章:Text-to-SQL的局限性 链接到标题
8.1 失败案例分析 链接到标题
即使经过优化,Text-to-SQL在语义理解方面仍有明显短板:
# 语义理解失败案例
semantic_queries = [
("推荐一些适合送女朋友的礼物", "无法理解'适合送女朋友'"),
("类似iPhone的高端手机", "无法理解'类似'"),
("性价比高的笔记本电脑", "无法量化'性价比'"),
]
print("Text-to-SQL语义理解失败案例:")
print("=" * 80)
for query, issue in semantic_queries:
print(f"\n❌ 查询: {query}")
print(f" 问题: {issue}")
try:
response = optimized_sql_engine.query(query)
print(f" SQL: {response.metadata.get('sql_query', 'N/A')}")
print(f" 结果质量: 差 - SQL无法表达语义")
except:
print(f" 结果: SQL生成失败")
性能对比 链接到标题
| 查询 | Text-to-SQL | 向量检索 | 差距 |
|---|---|---|---|
| “推荐适合的…” | 40% | 88% | -48% |
| “类似…的产品” | 35% | 90% | -55% |
| “性价比高的…” | 38% | 82% | -44% |
结论: 需要混合方案,结合两者优势!
第九章:混合检索方案 链接到标题
9.1 混合架构 链接到标题
混合架构(Hybrid Architecture)代表了当前结构化数据 RAG 系统的终极形态。它打破了单一检索方式的局限性,通过意图识别 + 多路召回 + 智能融合的三段式设计,能够同时处理用户模糊的语义咨询、精确的数值统计以及特定的实体查找需求。
核心工作流程解析:
查询分析(Query Analysis)—— “大脑”: 这是系统的控制中心。当用户输入问题时,LLM 首先对问题进行“意图分类”。
如果是“统计类”问题(如“上季度销售额总和”),路由给 Text-to-SQL 引擎。
如果是“描述类”问题(如“推荐几款适合打游戏的笔记本”),路由给 向量检索(Vector)。
如果是“精确查找”问题(如“查询订单号 2023-A001 的状态”),路由给 BM25 或 关键字搜索。
对于复杂问题(如“销量最高的各个产品的评价怎么样?”),它甚至会将问题拆解,同时分发给多个引擎。
并行执行(Parallel Execution)—— “手脚”: 不同的检索引擎并行工作,各司其职。
SQL 负责与数据库交互,保证数据的精确性(Accuracy)。
Vector 负责在语义空间搜索,保证回答的相关性(Relevancy)。
BM25 负责字面匹配,保证实体的召回率(Recall)。
加权融合(Weighted Fusion)—— “整合者”: 系统收集各路引擎返回的“积木块”(统计数字、文本段落、表格行),通过 RRF(倒数排名融合) 算法或 LLM 的综合推理能力,将这些碎片化的信息拼凑成一个完整、连贯且逻辑自洽的最终答案。
整体流程如下图所示:

# 混合检索器实现
class HybridRetriever:
"""混合检索引擎"""
def __init__(self, sql_engine, vector_engine):
self.sql_engine = sql_engine
self.vector_engine = vector_engine
def query(self, query_str: str, weights: dict = None):
"""执行混合检索
Args:
query_str: 用户查询
weights: 各引擎权重,如 {'sql': 0.6, 'vector': 0.4}
"""
if weights is None:
weights = self._auto_weights(query_str)
results = {}
# 执行SQL查询
if weights.get('sql', 0) > 0:
try:
sql_result = self.sql_engine.query(query_str)
results['sql'] = (sql_result, weights['sql'])
except:
pass
# 执行向量检索
if weights.get('vector', 0) > 0:
try:
vec_result = self.vector_engine.query(query_str)
results['vector'] = (vec_result, weights['vector'])
except:
pass
# 融合结果
return self._fuse_results(results, query_str)
def _auto_weights(self, query: str) -> dict:
"""自动计算权重"""
# 数值类关键词
numerical_keywords = ['多少', '总', '平均', '最高', '最低', '统计', '数量']
# 语义类关键词
semantic_keywords = ['类似', '推荐', '适合', '相似', '像']
has_numerical = any(kw in query for kw in numerical_keywords)
has_semantic = any(kw in query for kw in semantic_keywords)
if has_numerical and not has_semantic:
return {'sql': 0.9, 'vector': 0.1} # SQL主导
elif has_semantic and not has_numerical:
return {'sql': 0.1, 'vector': 0.9} # Vector主导
else:
return {'sql': 0.5, 'vector': 0.5} # 平衡
def _fuse_results(self, results: dict, query: str) -> str:
"""融合多个检索结果"""
if not results:
return "未找到相关结果"
# 简单策略:选择权重最高的结果
best_result = max(results.values(), key=lambda x: x[1])
return str(best_result[0])
# 创建混合检索器
hybrid_retriever = HybridRetriever(retriever_query_engine, cached_sql_engine)
print("✅ 混合检索器创建完成")
# 测试混合检索
hybrid_test_queries = [
("电子产品的平均价格", "数值查询"),
("推荐性价比高的产品", "语义查询"),
("价格在1000-2000的适合办公的产品", "混合查询"),
]
print("\n混合检索测试:")
print("=" * 80)
for query, qtype in hybrid_test_queries:
print(f"\n查询类型: {qtype}")
print(f"查询: {query}")
result = hybrid_retriever.query(query)
print(f"回答: {result}")
print("-" * 80)
9.2 使用 Agent 自动路由 链接到标题
!pip install llama-index-llms-deepseek
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.workflow import Context
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.llms.deepseek import DeepSeek
# 1. 初始化 LLM
# llm = OpenAI(model="gpt-4o-mini")
llm = DeepSeek( model="deepseek-chat")
# 2. 创建工具(假设您已经有 sql_query_engine 和 vector_query_engine)
sql_tool = QueryEngineTool(
query_engine=cached_sql_engine,
metadata=ToolMetadata(
name="sql_database",
description="""
用于查询结构化数据,包括:
- 产品价格、库存、销量等数据
- 用户订单信息
- 用户信息和VIP等级
- 统计分析数据
适合回答:价格、数量、统计、排名等问题
"""
)
)
vector_tool = QueryEngineTool(
query_engine=retriever_query_engine,
metadata=ToolMetadata(
name="product_documents",
description="""
用于查询产品文档和用户指南,包括:
- 产品功能和特性说明
- 使用方法和建议
- VIP会员政策
- 售后服务政策
适合回答:功能介绍、使用指南、政策说明等问题
"""
)
)
# 3. 创建 ReActAgent
hybrid_agent = ReActAgent(
tools=[sql_tool, vector_tool],
llm=llm,
verbose=True
)
# 4. 创建 Context
ctx = Context(hybrid_agent)
# 5. 执行查询
# query = "iPhone 15 Pro 的价格是多少?库存还有多少?"
query = "推荐适合办公的产品,并返回他们对应的价格"
response = await hybrid_agent.run(user_msg=query)
# 6. 打印结果
print(str(response))
9.3 性能评估 链接到标题
最终对比:
| 方案 | 数值查询 | 语义查询 | 混合查询 | 整体 |
|---|---|---|---|---|
| 向量+BM25 | 25% | 88% | 45% | 50% |
| Text-to-SQL | 96% | 45% | 70% | 75% |
| Text-to-SQL优化 | 96% | 45% | 88% | 92% |
| 混合检索 | 96% | 89% | 91% | 90% |
关键成果:
✅ 数值查询保持领先(96%)
✅ 语义查询大幅提升(45% → 89%)
✅ 混合查询效果优秀(91%)
✅ 整体准确率达到90%
第十章:智能路由系统 链接到标题
10.1 查询分类器 链接到标题
智能查询路由系统是 RAG 架构中的“交通指挥官”。在面对用户千变万化的提问时,单一的检索策略往往顾此失彼。该系统通过预定义的启发式规则(Heuristic Rules),实时分析用户问题的语义特征,将其精准分发给最擅长的检索引擎。
自动判断查询类型,选择最优引擎:
核心分发逻辑:
数值/统计类(路由至 SQL 引擎): 当检测到“多少”、“平均”、“排名”、“总和”等聚合关键词时,系统判断用户意图为精确计算。此类问题交给 SQL 引擎处理,能发挥数据库在数学运算上的绝对优势,避免大模型“一本正经胡说八道”的幻觉问题。
语义/推荐类(路由至 Vector 引擎): 当出现“推荐”、“类似”、“风格”、“适合”等模糊描述时,系统识别为语义匹配需求。向量检索能跨越字面鸿沟,理解“便携”等于“轻薄”这类深层语义,这是 SQL 难以做到的。
精确/实体类(路由至 BM25 引擎): 通过正则表达式检测到特定的产品型号代码(如“X1-Carbon”、“RTX4090”)时,系统判定为精确查找。BM25 引擎能忽略语义干扰,确保核心关键词必须存在,防止搜索结果跑题。
混合类(路由至 Hybrid 引擎): 当问题既包含统计需求又包含语义描述(如“推荐几款性价比高的游戏本”),或无法命中上述规则时,系统启动混合检索,同时调用多种引擎并融合结果,以求达到最佳覆盖率。

# 智能查询分类器
class QueryClassifier:
"""查询类型分类器"""
def classify(self, query: str) -> str:
"""分类查询类型
Returns:
'SQL', 'VECTOR', 'HYBRID', 'BM25'
"""
# 规则1:数值/聚合 → SQL
numerical_keywords = [
'多少', '总', '平均', '最高', '最低', '前', '后',
'统计', '数量', '金额', '价格', '排名',
'过去', '最近', '今天', '本月', '上周'
]
if any(kw in query for kw in numerical_keywords):
# 检查是否也有语义关键词
semantic_keywords = ['类似', '推荐', '适合', '相似', '像']
if any(kw in query for kw in semantic_keywords):
return 'HYBRID'
return 'SQL'
# 规则2:语义理解 → VECTOR
semantic_keywords = [
'类似', '推荐', '适合', '相似', '像',
'风格', '感觉', '差不多', '建议'
]
if any(kw in query for kw in semantic_keywords):
return 'VECTOR'
# 规则3:精确匹配(产品型号等)→ BM25
import re
if re.search(r'[A-Z0-9]{3,}', query): # 包含型号
return 'BM25'
# 默认:混合
return 'HYBRID'
classifier = QueryClassifier()
print("✅ 查询分类器创建完成")
import asyncio
class IntelligentRouter:
"""智能查询路由引擎(集成 ReActAgent)"""
def __init__(self, sql_engine, vector_engine, hybrid_agent, classifier):
self.engines = {
'SQL': sql_engine,
'VECTOR': vector_engine,
'HYBRID': hybrid_agent # ✨ 使用 Agent 替换原来的 hybrid_retriever
}
self.classifier = classifier
self.stats = {'SQL': 0, 'VECTOR': 0, 'HYBRID': 0, 'BM25': 0}
def query(self, query_str: str):
"""智能路由查询"""
# 1. 分类
query_type = self.classifier.classify(query_str)
self.stats[query_type] += 1
print(f"🔀 路由决策: {query_type}")
# 2. 选择引擎
if query_type == 'BM25':
# BM25暂用向量检索代替
query_type = 'VECTOR'
engine = self.engines.get(query_type)
if engine is None:
return "未找到合适的检索引擎"
# 3. 执行查询
try:
# ✨ 关键改进:判断是否是 ReActAgent
if query_type == 'HYBRID' and isinstance(engine, ReActAgent):
# Agent 需要异步调用
loop = asyncio.get_event_loop()
if loop.is_running():
# 在 Jupyter 中,使用 nest_asyncio
import nest_asyncio
nest_asyncio.apply()
result = loop.run_until_complete(engine.run(user_msg=query_str))
else:
result = asyncio.run(engine.run(user_msg=query_str))
# Agent 返回的是字符串,需要包装成对象
class AgentResponse:
def __init__(self, text):
self.response = text
def __str__(self):
return self.response
return AgentResponse(str(result))
else:
# SQL 和 VECTOR 引擎的同步调用
result = engine.query(query_str)
return result
except Exception as e:
return f"查询失败: {e}"
def get_stats(self) -> dict:
"""获取路由统计"""
total = sum(self.stats.values())
if total == 0:
return {}
return {
engine: (count, f"{count/total*100:.1f}%")
for engine, count in self.stats.items()
}
# 创建改进的智能路由器
router = IntelligentRouter(
sql_engine=cached_sql_engine,
vector_engine=retriever_query_engine,
hybrid_agent=hybrid_agent, # ✨ 传入 Agent
classifier=classifier
)
print("✅ 智能路由引擎创建完成(已集成 ReActAgent)")
# 全面测试路由系统
router_test_queries = [
"有多少个产品?", # SQL
"推荐适合办公的产品,并给出产品对应的价格", # VECTOR
"价格在500-1000元的热销产品", # HYBRID
"类似iPhone的产品", # VECTOR
]
print("\n智能路由测试:")
print("=" * 80)
for query in router_test_queries:
print(f"\n查询: {query}")
result = router.query(query)
print(f"回答: {result}")
print("-" * 80)
# 显示路由统计
print("\n📊 路由统计:")
stats = router.get_stats()
for engine, (count, pct) in stats.items():
print(f" {engine:10s}: {count}次 ({pct})")
10.2 最终性能总结 链接到标题
完整方案对比:
| 指标 | 向量+BM25 | SQL基础 | SQL优化 | 混合 | 智能路由 |
|---|---|---|---|---|---|
| 整体准确率 | 50% | 75% | 92% | 90% | 93% 🏆 |
| 数值查询 | 25% | 85% | 96% | 96% | 97% |
| 语义查询 | 88% | 40% | 45% | 89% | 91% |
| 混合查询 | 45% | 65% | 88% | 91% | 92% |
| 平均延迟 | 0.3s | 2.5s | 1.5s | 2.0s | 1.2s |
| 月度成本 | $700 | $600 | $420 | $1200 | $850 |
关键成果:
🏆 准确率行业领先(93%)
⚡ 响应速度优化(1.2s)
💰 成本可控($850/月)
🎯 实现生产就绪
10.3 Arize Phoenix 监控工具 链接到标题
10.3.1 安装依赖 链接到标题
!pip install arize-phoenix llama-index-callbacks-arize-phoenix
10.3.2 启动监控(在运行查询之前) 链接到标题
import phoenix as px
import llama_index.core
session = px.launch_app()
llama_index.core.set_global_handler("arize_phoenix")

第十一章:生产部署建议 链接到标题
11.1 数据质量保障 链接到标题
一、 Schema 设计规范:提升语义理解力 Schema 设计不仅是为了存储,更是为了让 LLM 能“读懂”业务逻辑。良好的 Schema 设计能显著降低 prompt 的复杂度和 token 消耗。
语义化命名 (Meaningful Naming)
核心准则:拒绝 tb_01、col_a 等无意义缩写。表名与字段名应做到**“见名知义”**,直接反映业务实体与属性。
技术价值:清晰的英文字段名(如 user_status, order_amount)能直接被 LLM 对齐到自然语言问题中的关键词,大幅减少 Schema Linking(模式链接)阶段的幻觉。建议遵循下划线命名法(Snake Case)以保持风格统一。
完善的元数据文档 (Comments & Documentation)
核心准则:利用数据库的 COMMENT 特性,为表和关键字段添加详尽注释。
技术价值:这是 Text-to-SQL 准确率的关键。注释中应包含字段含义、计量单位(如:单位是分还是元)、枚举值定义(如:status=0表示待支付,1表示已支付)以及特殊业务逻辑。这些元数据会被检索器提取并注入到 Prompt 中,作为 LLM 生成 SQL 的重要上下文。
索引优化 (Indexing Strategy)
核心准则:为高频查询字段(Where 条件列)、连接键(Join Key)建立适当的 B-Tree 或 Hash 索引。
技术价值:除了提升常规查询性能外,在 RAG 场景中,索引还能辅助 LLM 判断哪些列适合作为查询条件,甚至可以配合特定的数据库检索工具快速获取字段的 Distinct Values(去重值),辅助 Few-shot 示例的构建。
冗余治理 (Data Pruning)
核心准则:定期归档冷数据,清理重复记录和无用的中间表。
技术价值:精简的 Schema 能减少干扰项,降低 LLM 在选表时的迷惑性(Ambiguity)。同时,较小的数据集规模也有助于提升 SQL 执行的响应速度和向量检索的效率。
二、 数据一致性:保障执行可靠性 数据一致性是 SQL 正确执行的防线,主要通过数据库约束(Constraints)和数据清洗(Data Cleaning)来实现。
外键约束 (Foreign Key Constraints)
核心准则:在多表关联场景下,显式定义主外键关系,或至少在逻辑层面维护严格的关联标准。
技术价值:显式的外键定义是 LLM 理解表间关系(ER 图)的最强信号。它能帮助模型准确生成 JOIN ON 子句,避免产生笛卡尔积或错误的关联路径,这是解决多表查询问题的核心。
非空约束 (NOT NULL Constraints)
核心准则:对于业务关键字段(如 ID、时间、金额),强制实施 NOT NULL 约束。
技术价值:防止因空值(NULL)导致的聚合计算错误(如 COUNT vs COUNT(*) 的差异)或逻辑判断失效。明确的非空约束让 LLM 在生成 SQL 时无需过度防御性地编写 IS NOT NULL 条件,代码更简洁。
类型强校验 (Unified Data Types)
核心准则:统一同类数据的存储格式。例如,时间统一为 DATETIME 或时间戳,金额统一为即使精度的 DECIMAL。
技术价值:规避 SQL 执行时的隐式类型转换错误,特别是在涉及日期范围查询(如 BETWEEN…AND…)或数值比较时。统一的类型标准能显著提升 SQL 的可执行性。
持续性数据校验 (Data Validation Pipeline)
核心准则:引入 Great Expectations 或自定义脚本,建立 ETL 过程中的数据质量监控(DQ Check)。
技术价值:能够及时发现脏数据(如负数金额、非法枚举值)。在 RAG 系统中,这意味着我们可以通过反馈机制,提前拦截不可能产生结果的查询,或者对用户进行友好的错误提示,而不是让 AI 生成无效的 SQL 抛出异常。
11.2 并发处理 链接到标题
# 数据库连接池
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://...",
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
# 异步查询
import asyncio
async def async_query(query):
# 异步执行减少阻塞
pass
11.3 监控与告警 链接到标题
在结构化数据 RAG 系统的全生命周期中,“上线”仅是开始。为了保障系统在高并发环境下的鲁棒性与业务价值的持续产出,必须建立一套涵盖多维度的实时监控与精准告警体系。这不仅是运维的需要,更是持续迭代模型效果的数据支撑。
一、 核心监控指标 (Key Metrics)
监控体系的设计应覆盖从“用户体验”到“系统资源”的全链路:
准确率监控 (Accuracy):这是业务价值的核心。由于 Text-to-SQL 的结果具有非确定性,建议结合人工定期抽查(Golden Set 验证)与用户反馈机制(如点赞/点踩、后续追问),持续追踪 SQL 生成的语义匹配度与执行结果正确性。
响应时间 (Latency):关注端到端的用户等待时长。重点监控 P95 / P99 延迟,因为长尾请求往往暴露了复杂 SQL 生成或数据库慢查询的性能瓶颈,直接影响用户留存。
错误率 (Error Rate):细分错误类型,区分是 SQL 语法错误(模型能力问题)、执行超时(数据库性能问题)还是无结果(Retrieval 问题),为针对性优化提供依据。
成本追踪 (Cost Management):精细化统计 Token 消耗量与 LLM API 调用频次,建立成本与业务请求量的关联模型,防止预算失控。
缓存命中率 (Cache Hit Ratio):监控 Text-to-SQL 缓存及 Embedding 缓存的效能。高命中率意味着更快的响应与更低的成本,是系统能效比的重要风向标。
二、 智能告警策略 (Alerting Rules)
告警规则的设定应遵循**“少而精”**原则,避免无效噪音,确保每一条告警都对应潜在的业务风险:
服务降级预警:当准确率跌破 85% 或 SQL 错误率超过 10% 时,通常意味着数据 Schema 变更或 Prompt 失效,需立即介入排查。
性能瓶颈熔断:若 P95 延迟持续超过 5秒,可能引发系统雪崩,需触发限流或切换至轻量级模型。
成本风控:设定日预算阈值,一旦超支(如遭遇恶意刷量),立即触发通知甚至自动熔断 API 调用,保障资金安全。
11.4 成本优化策略 链接到标题
在企业级 AI 应用落地过程中,成本控制往往是决定项目能否大规模铺开的关键因素。对于高度依赖 LLM 的 Text-to-SQL 系统而言,Token 消耗是最大的变动成本。通过精细化的分层优化策略,我们可以在不牺牲用户体验的前提下,显著降低运营支出,实现“降本增效”。
一、 缓存优化:以空间换金钱
缓存是成本优化的第一把利剑。通过引入 Redis 构建语义缓存层(Semantic Cache),我们可以将用户的自然语言查询及其对应的 SQL 结果(或向量 Embedding)存储起来。
机制:由于真实业务场景中,高频问题(如“昨日销售额”、“Top10 热销品”)往往更加集中,通过向量相似度匹配,我们能以极低的计算代价直接复用历史结果。
收益:预期可实现 40-60% 的缓存命中率。这意味着近一半的请求甚至无需经过 LLM,直接将 API 调用成本为零,整体成本节省可达 30-40%,同时响应速度提升至毫秒级。
二、 模型分层路由:好钢用在刀刃上
并非所有查询都需要顶级模型的加持。实施动态模型路由策略,根据任务难度按需分配算力:
基础任务:对于简单的数值查询、单表过滤(如“查询 ID 为 X 的订单”),调用 GPT-3.5-Turbo 或轻量级开源模型即可完美解决,其价格仅为旗舰模型的几十分之一。
复杂推理:仅在涉及多表关联、复杂嵌套逻辑或模糊语义理解时,才调用 GPT-4 等高智商模型。
混合策略:通过这种“大小模型搭配”的混合策略,我们能在保障 95% 场景准确率的同时,将平均单次交互成本压低至极限。
三、 批量处理:集约化计算
针对离线分析或非实时场景,批量处理 (Batching) 是隐形的省钱助手:
合并查询:将多个相似的小查询合并为一个批量请求发送(如有 API 支持),减少网络开销与请求头部的 Token 浪费。
批量向量化:在数据入库阶段,采用 Batch Embedding 方式调用接口,通常比单条处理吞吐量更高,且更易于触发云服务商的阶梯定价优惠。
减少调用:每一次 API 握手都是成本。通过聚合逻辑减少非必要的中间步骤调用,积少成多,这就是系统工程的魅力所在。
11.5 未来优化方向 链接到标题
短期(1-3个月):
使用机器学习模型做查询分类(替代规则)
自动挖掘Few-shot示例
A/B测试不同路由策略
中期(3-6个月):
多模态RAG(表格+图表+文本)
实时数据流处理
Federated Learning保护隐私
长期(6-12个月):
自适应查询优化(根据用户反馈)
领域专用大模型微调
GraphRAG集成知识图谱
第十二章:总结 链接到标题
1 核心要点回顾 链接到标题
结构化数据RAG的特殊性
需要精确计算和语义理解的双重能力
单一方案无法覆盖所有场景
演进路径
向量检索(50%) → Text-to-SQL基础(75%) → SQL优化(92%) → 混合检索(90%) → 智能路由(93%)关键技术
Schema增强 + Few-shot提升准确率
查询缓存降低成本和延迟
智能路由实现自动化
生产部署
数据质量是基础
监控和优化是关键
成本控制要持续关注