- 文档存储的node 信息 全部json的保存
- 文件大小
- 文件创建
- 修改时间
- 上传人
- 上传部门
- 上传tag
- 上传tag
- doc_id
- metadata 的一个JSON表
- json列表 :node_id
print("📌 【原始Document文档ID】:", new_docs[0].id_) # fa.txt对应的文档根ID
print("\n📌 【切分后Node节点ID列表】(落盘JSON文件名):")
for node in new_nodes:
print(f"→ 节点ID:{node.id_} | 关联文档ID:{node.metadata['document_id']}")
文档.id_. ==node.metadata['document_id']
node.id_
def load_all_nodes() -> list[BaseNode]:
return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
for p in NODE_DIR.glob("*.json")]
关键词召回 BM25
这里ODE_DIR.glob("*.json")也可以使用数据库生成一个node的json文件类表 此处缓存数据库查询后返回的node信息比较好
实现精确控制对吧
比如 数据库 查询出
综合部 有2000个node节点。
然后 给出列表
进行bm25
然后再查询
向量召回
使用刚刚数据库查询出来的document_id 来指定metadata['document_id']后检索
再向量召回
SQLITE查询载入node信息
def load_all_nodes() -> List[BaseNode]:
"""
从 SQLite 数据库加载所有节点。
假设表 `nodes` 中有字段: id (TEXT), node_json (TEXT)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute("SELECT node_json FROM nodes")
rows = cursor.fetchall()
finally:
conn.close()
nodes = []
for (node_json_str,) in rows:
# 将 JSON 字符串解析为 dict,再反序列化为 BaseNode
node_dict = json.loads(node_json_str)
node = BaseNode.from_dict(node_dict)
nodes.append(node)
return nodes
根据metadata查询并返回node信息
"""
根据 department 值,从 SQLite 查询 metadata 中匹配的节点。
要求表结构:
CREATE TABLE nodes (
id TEXT PRIMARY KEY,
metadata TEXT NOT NULL, -- 存 JSON 字符串,如 '{"department": "综合部", ...}'
node_json TEXT NOT NULL -- 完整 BaseNode 的 JSON
);
"""
def load_all_nodes(department: str) -> List[BaseNode]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# 使用 ? 占位符,安全传参
cursor.execute(
"SELECT node_json FROM nodes WHERE json_extract(metadata, '$.department') = ?",
(department,)
)
rows = cursor.fetchall()
finally:
conn.close()
# 反序列化为 BaseNode 列表
nodes = []
for (node_json_str,) in rows:
node_dict = json.loads(node_json_str)
node = BaseNode.from_dict(node_dict)
nodes.append(node)
return nodes
retriever = BM25Retriever.from_defaults(
nodes=nodes,
similarity_top_k=top_k
)