MVP 小规模 1万文档
SQlite 存储node信息,查询后实时BM25再关键词召回
chroma向量召回
大规模企业 10万文档以下
PostgreSQL 节点+关键词;直接关键词召回
(使用 PostgreSQL 全文检索(tsvector,适合英文/分词后中文)
直接SQL语句 需要提前使用jieba分词)
把 PostgreSQL检索封装成 BaseRetriever
Milvus向量召回
RAG原理
发表评论
MVP 小规模 1万文档
SQlite 存储node信息,查询后实时BM25再关键词召回
chroma向量召回
大规模企业 10万文档以下
PostgreSQL 节点+关键词;直接关键词召回
(使用 PostgreSQL 全文检索(tsvector,适合英文/分词后中文)
直接SQL语句 需要提前使用jieba分词)
把 PostgreSQL检索封装成 BaseRetriever
Milvus向量召回
print("📌 【原始Document文档ID】:", new_docs[0].id_) # fa.txt对应的文档根ID
print("\n📌 【切分后Node节点ID列表】(落盘JSON文件名):")
for node in new_nodes:
print(f"→ 节点ID:{node.id_} | 关联文档ID:{node.metadata['document_id']}")
文档.id_. ==node.metadata['document_id']
node.id_
def load_all_nodes() -> list[BaseNode]:
return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
for p in NODE_DIR.glob("*.json")]
关键词召回 BM25
这里ODE_DIR.glob("*.json")也可以使用数据库生成一个node的json文件类表 此处缓存数据库查询后返回的node信息比较好
实现精确控制对吧
比如 数据库 查询出
综合部 有2000个node节点。
然后 给出列表
进行bm25
然后再查询
向量召回
使用刚刚数据库查询出来的document_id 来指定metadata['document_id']后检索
再向量召回
def load_all_nodes() -> List[BaseNode]:
"""
从 SQLite 数据库加载所有节点。
假设表 `nodes` 中有字段: id (TEXT), node_json (TEXT)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("SELECT node_json FROM nodes")
rows = cursor.fetchall()
finally:
conn.close()
nodes = []
for (node_json_str,) in rows:
# 将 JSON 字符串解析为 dict,再反序列化为 BaseNode
node_dict = json.loads(node_json_str)
node = BaseNode.from_dict(node_dict)
nodes.append(node)
return nodes
"""
根据 department 值,从 SQLite 查询 metadata 中匹配的节点。
要求表结构:
CREATE TABLE nodes (
id TEXT PRIMARY KEY,
metadata TEXT NOT NULL, -- 存 JSON 字符串,如 '{"department": "综合部", ...}'
node_json TEXT NOT NULL -- 完整 BaseNode 的 JSON
);
"""
def load_all_nodes(department: str) -> List[BaseNode]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# 使用 ? 占位符,安全传参
cursor.execute(
"SELECT node_json FROM nodes WHERE json_extract(metadata, '$.department') = ?",
(department,)
)
rows = cursor.fetchall()
finally:
conn.close()
# 反序列化为 BaseNode 列表
nodes = []
for (node_json_str,) in rows:
node_dict = json.loads(node_json_str)
node = BaseNode.from_dict(node_dict)
nodes.append(node)
return nodes
retriever = BM25Retriever.from_defaults(
nodes=nodes,
similarity_top_k=top_k
)
from pathlib import Path
import json
from llama_index.core.schema import TextNode, BaseNode
NODE_DIR = Path("index_storage/nodes")
NODE_DIR.mkdir(parents=True, exist_ok=True)
# ---- 保存 ----
def save_nodes(nodes: list[BaseNode]):
for n in nodes:
(NODE_DIR / f"{n.id_}.json").write_text(
json.dumps(n.to_dict(), ensure_ascii=False), encoding="utf8")
# ---- 加载 ----
def load_all_nodes() -> list[BaseNode]:
return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
for p in NODE_DIR.glob("*.json")]
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
# 新增文件 → 切节点 → 落盘
new_docs = SimpleDirectoryReader(input_files=["new_data/fa.txt"]).load_data()
new_nodes = splitter.get_nodes_from_documents(new_docs)
save_nodes(new_nodes) # 只写新增
def delete_nodes_by_docid(doc_id: str):
"""把属于同一篇 doc_id 的所有节点文件删掉"""
for p in NODE_DIR.glob("*.json"):
node = BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
if node.metadata.get("doc_id") == doc_id: # 或你自己约定的字段
p.unlink()
调用示例:
delete_nodes_by_docid("doc_xxx") # 删掉整篇文档对应的所有节点
如果数据库记录doc_id对应的node_id那么可以
def delete_nodes_by_nodeid(node_id: str):
if NODE_DIR/f"{node_id}.json").exists():
return NODE_DIR/f"{node_id}.json").unlink()
else:
return False
all_nodes = load_all_nodes() # 含历史+新增
# 如果前面删过,这里自然就不包含被删的节点
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
至此:
ChineseBM25Retriever(nodes=..., ...),2 min 内完成 100 万篇from pathlib import Path
import json
from llama_index.core.schema import BaseNode
NODE_DIR = Path("index_storage/nodes")
def load_all_nodes() -> list[BaseNode]:
"""一节点一文件 → 一次性全载入"""
return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
for p in NODE_DIR.glob("*.json")]
# 后面想干嘛就干嘛
all_nodes = load_all_nodes()
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
# -*- coding: utf-8 -*-
"""
llama-index-core 0.6.0 中文 BM25 增量索引
节点 -> JSON
BM5 每次重建(不再硬编码私有字段,永不出错)
"""
from __future__ import annotations
from typing import List
import json, jieba
from pathlib import Path
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.schema import BaseNode, TextNode
# ---------------- 路径 ----------------
STORAGE_ROOT = Path("index_storage")
NODES_DIR = STORAGE_ROOT / "nodes"
ensure_dir = lambda p: p.mkdir(parents=True, exist_ok=True)
# ---------------- 节点 JSON 序列化 ----------------
def node_to_dict(node: BaseNode) -> dict:
return {
"id_": node.id_,
"text": node.text,
"metadata": node.metadata,
"excluded_embed_metadata_keys": node.excluded_embed_metadata_keys,
"excluded_llm_metadata_keys": node.excluded_llm_metadata_keys,
}
def dict_to_node(d: dict) -> TextNode:
return TextNode(**d)
def save_nodes(nodes: List[BaseNode]):
ensure_dir(NODES_DIR)
for n in nodes:
(NODES_DIR / f"{n.id_}.json").write_text(
json.dumps(node_to_dict(n), ensure_ascii=False), encoding="utf8")
def load_all_nodes() -> List[BaseNode]:
if not NODES_DIR.exists():
return []
return [dict_to_node(json.loads(p.read_text(encoding="utf8")))
for p in NODES_DIR.glob("*.json")]
# ---------------- 中文分词 ----------------
class ChineseBM25Retriever(BM25Retriever):
def _tokenize(self, text: str) -> List[str]:
return [w for w in jieba.cut(text) if w.strip()]
# ---------------- 业务逻辑 ----------------
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
def build_or_load_index():
"""只管理节点,BM5 每次都重建"""
all_nodes = load_all_nodes()
if all_nodes:
print(f">>> 加载已有节点:{len(all_nodes)} 条,正在重建 BM5...")
else:
print(">>> 首次构建节点...")
documents = SimpleDirectoryReader("data").load_data()
all_nodes = splitter.get_nodes_from_documents(documents)
save_nodes(all_nodes)
# ****** 每次都重建 BM5,不再碰任何私有属性 ******
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
return all_nodes, retriever
def incremental_update(all_nodes: List[BaseNode]) -> ChineseBM25Retriever:
"""增量追加节点 -> 重建 BM5"""
input_files = ["new_data/fa.txt", "new_data/fa2.txt"]
new_docs = SimpleDirectoryReader(input_files=input_files).load_data()
new_nodes = splitter.get_nodes_from_documents(new_docs)
if not new_nodes:
print(">>> 无新增文件,直接复用现有 BM5")
return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
print(f">>> 增量新增 {len(new_nodes)} 个节点,重建 BM5...")
save_nodes(new_nodes)
all_nodes.extend(new_nodes)
# ****** 重建 BM5,永不再出错 ******
return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
# ---------------- 启动 ----------------
if __name__ == "__main__":
all_nodes, retriever = build_or_load_index()
retriever = incremental_update(all_nodes)
results = retriever.retrieve("what is look")
print(f">>> 查询召回 {len(results)} 条")
print(f">>> 总节点数 {len(all_nodes)}")
documents = SimpleDirectoryReader(
input_dir="data",
extensions=[".docx", ".xlsx"], # 精准筛选格式,避免读取无关文件
recursive=True # 可选:是否递归读取子目录内的文件(True=递归,False=仅根目录)
).load_data()
文件解析
pip install llama-index-readers-file
pip install python-docx pandas openpyxl xlrd
核心安装
pip install llama-index
pip install llama-index-embeddings-ollama llama-index-llms-ollama
外置保存
pip install llama-index-retrievers-bm25 jieba pickle
pip install chromadb llama-index-vector-stores-chroma
读取文件创建关键词BM25 可以增量但是删除修改要重建
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers import bm25
import jieba
def chinese_tokenizer(text: str) -> List[str]:
return list(jieba.cut(text))
# 1. 读文档 切分
documents = SimpleDirectoryReader("data").load_data()
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
new_nodes = splitter.get_nodes_from_documents(documents)
retriever = bm25.BM25Retriever(nodes=new_nodes, similarity_top_k=10)
retriever.tokenizer = chinese_tokenizer
#此处可以省略
# 3. 默认查询
results = retriever.retrieve("author growing up")
print(f"召回 {len(results)} 条")
#新增多个文件切分
input_files = ["data/file1.docx", "data/file2.txt", "data/file3.pdf"]
docs = SimpleDirectoryReader(input_files=input_files).load_data()
# 2. 直接切分(无需区分格式)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
nodes = splitter.get_nodes_from_documents(docs) # 一步到位
#重新加入文件
retriever = bm25.BM25Retriever(nodes=new_nodes, similarity_top_k=10)
retriever.tokenizer = chinese_tokenizer
#此处可以省略
# 3. 默认查询
results = retriever.retrieve("author growing up??")
print(f"召回 {len(results)} 条")
#上面第二步开始即可调用
# 1. pickle存储
import pickle
with open("./idx/bm25.pkl", "rb") as f:
bm25_retriever = pickle.load(f)
# 可热调参数
bm25_retriever.similarity_top_k = 15
##-----------
from llama_index.core import SimpleDirectoryReader
# 1. 读新增文件
new_docs = SimpleDirectoryReader("new_data").load_data()
# 2. 增量加入(官方支持 add_nodes)
# 4. ✅ 必须先切成 Node(官方示例缺失这一步)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
new_nodes = splitter.get_nodes_from_documents(new_docs)
# 5. ✅ 只能重建
# 3. 都使用PICKLE覆盖
Path("./idx").mkdir(exist_ok=True)
with open("./idx/bm25.pkl", "wb") as f:
pickle.dump({"nodes": nodes, "top_k": 10, "tokenizer": chinese_tokenizer}, f)
# 查询(官方示例 ⑤ —— retrieve)
results = bm25_retriever.retrieve("作者童年做了什么")
for node in results:
print(node.text[:200], node.score)
#融合
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core import VectorStoreIndex
# 1. 向量检索器
vector_index = VectorStoreIndex.from_documents(documents)
vector_retriever = vector_index.as_retriever(similarity_top_k=10)
# 2. BM25 检索器(已加载)pickle.load加载
bm25_retriever = pickle.load(open("./idx/bm25.pkl", "rb"))
# 3. 融合
fusion_retriever = QueryFusionRetriever(
[vector_retriever, bm25_retriever],
similarity_top_k=10,
)
# 4. 后处理链(同前)
query_engine = RetrieverQueryEngine(
retriever=fusion_retriever,
node_postprocessors=[...],
)
中文优化 jieba 要在from_documents中加入tokenizer=chinese_tokenizer
import jieba
def chinese_tokenizer(text: str) -> List[str]:
return list(jieba.cut(text))
# 1. 读取全部文档(含增量目录)
all_docs = (SimpleDirectoryReader("data").load_data() +
SimpleDirectoryReader("new_data").load_data())
nodes = SentenceSplitter(chunk_size=512, chunk_overlap=30).get_nodes_from_documents(all_docs)
bm25.tokenizer = chinese_tokenizer
retriever = bm25.BM25Retriever(nodes=nodes, similarity_top_k=10)
生成一个简单能用的:
# -*- coding: utf-8 -*-
# 0.6.0 实测接口:只有 retrieve,没有 add/add_nodes —— 只能“一次性重建”
import pickle, jieba
from pathlib import Path
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers import bm25
def chinese_tokenizer(text: str) -> list[str]:
return list(jieba.cut(text))
# 1. 读取全部文档(含增量目录)
all_docs = (SimpleDirectoryReader("data").load_data() +
SimpleDirectoryReader("new_data").load_data())
# 2. 切节点
nodes = SentenceSplitter(chunk_size=512, chunk_overlap=30).get_nodes_from_documents(all_docs)
# 3. 建 retriever
retriever = bm25.BM25Retriever(nodes=nodes, similarity_top_k=10)
retriever.tokenizer = chinese_tokenizer
#retriever.stemmer = None
# 4. 落盘
Path("./idx").mkdir(exist_ok=True)
with open("./idx/bm25.pkl", "wb") as f:
pickle.dump({"nodes": nodes, "top_k": 10, "tokenizer": chinese_tokenizer}, f)
# 5. 查询
for n in retriever.retrieve("what is look?"):
print(n.score, n.text[:200])
这样:
from typing import List
import jieba
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
# ===================== 核心适配:重写BM25Retriever类以支持中文分词 =====================
class ChineseBM25Retriever(BM25Retriever):
"""适配0.6.0版本的中文BM25检索器(重写分词方法)"""
def _tokenize(self, text: str) -> List[str]:
"""重写内部分词方法,替换为jieba中文分词"""
# 过滤空字符串,避免影响BM25计算
return [word for word in jieba.cut(text) if word.strip()]
# ===================== 1. 初始加载文档 + 切分 + 初始化检索器 =====================
# 加载初始目录下的所有文档
documents = SimpleDirectoryReader("data").load_data()
# 统一初始化切分器(仅初始化一次,保证切分规则一致)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
# 切分初始文档为节点
original_nodes = splitter.get_nodes_from_documents(documents)
# 【关键修改】使用自定义的中文BM25检索器(0.6.0版本无tokenizer参数)
retriever = ChineseBM25Retriever(
nodes=original_nodes,
similarity_top_k=10 # 仅保留支持的参数
)
# 初始查询
first_results = retriever.retrieve("what is look")
print(f"初始查询 - 召回 {len(first_results)} 条")
# ===================== 2. 新增多个文件 + 切分 + 增量追加 =====================
# 加载新增的混合格式文件
input_files = ["new_data/fa.txt", "new_data/fa2.txt"]
new_docs = SimpleDirectoryReader(input_files=input_files).load_data()
# 用同一个切分器切分新增文件
new_nodes = splitter.get_nodes_from_documents(new_docs)
# 合并所有节点后重新初始化检索器(0.6.0版本无nodes属性,只能重新初始化)
all_nodes = original_nodes + new_nodes
retriever = ChineseBM25Retriever(
nodes=all_nodes,
similarity_top_k=10
)
# ===================== 3. 新增文件后重新查询 =====================
second_results = retriever.retrieve("what is look?")
print(f"新增文件后查询 - 召回 {len(second_results)} 条")
# 验证节点数
print(f"\n初始节点数:{len(original_nodes)}")
print(f"新增节点数:{len(new_nodes)}")
print(f"当前检索器总节点数:{len(all_nodes)}")