月度归档:2025年12月

RAG原理

MVP 小规模 1万文档
SQlite 存储node信息,查询后实时BM25再关键词召回
chroma向量召回

大规模企业 10万文档以下
PostgreSQL  节点+关键词;直接关键词召回
(使用 PostgreSQL 全文检索(tsvector,适合英文/分词后中文)
直接SQL语句 需要提前使用jieba分词)
把 PostgreSQL检索封装成 BaseRetriever
Milvus向量召回

文档表 node信息json字段保存,metadata信息JSON保存

  • 文档存储的node 信息 全部json的保存
  • 文件大小
  • 文件创建
  • 修改时间
  • 上传人
  • 上传部门
  • 上传tag
  • 上传tag
  • doc_id
  • metadata 的一个JSON表
  • json列表 :node_id
print("📌 【原始Document文档ID】:", new_docs[0].id_)  # fa.txt对应的文档根ID
print("\n📌 【切分后Node节点ID列表】(落盘JSON文件名):")
for node in new_nodes:
    print(f"→ 节点ID:{node.id_} | 关联文档ID:{node.metadata['document_id']}")

文档.id_.  ==node.metadata['document_id']
node.id_
def load_all_nodes() -> list[BaseNode]:
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

关键词召回 BM25
这里ODE_DIR.glob("*.json")也可以使用数据库生成一个node的json文件类表 此处缓存数据库查询后返回的node信息比较好
实现精确控制对吧
比如 数据库 查询出 
综合部 有2000个node节点。
然后 给出列表 
进行bm25 
然后再查询

向量召回
使用刚刚数据库查询出来的document_id 来指定metadata['document_id']后检索
再向量召回

SQLITE查询载入node信息

def load_all_nodes() -> List[BaseNode]:
    """
    从 SQLite 数据库加载所有节点。
    假设表 `nodes` 中有字段: id (TEXT), node_json (TEXT)
    """
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    try:
        cursor.execute("SELECT node_json FROM nodes")
        rows = cursor.fetchall()
    finally:
        conn.close()

    nodes = []
    for (node_json_str,) in rows:
        # 将 JSON 字符串解析为 dict,再反序列化为 BaseNode
        node_dict = json.loads(node_json_str)
        node = BaseNode.from_dict(node_dict)
        nodes.append(node)

    return nodes

根据metadata查询并返回node信息

    """
    根据 department 值,从 SQLite 查询 metadata 中匹配的节点。
    
    要求表结构:
        CREATE TABLE nodes (
            id TEXT PRIMARY KEY,
            metadata TEXT NOT NULL,   -- 存 JSON 字符串,如 '{"department": "综合部", ...}'
            node_json TEXT NOT NULL   -- 完整 BaseNode 的 JSON
        );
    """

def load_all_nodes(department: str) -> List[BaseNode]:
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    try:
        # 使用 ? 占位符,安全传参
        cursor.execute(
            "SELECT node_json FROM nodes WHERE json_extract(metadata, '$.department') = ?",
            (department,)
        )
        rows = cursor.fetchall()
    finally:
        conn.close()
    # 反序列化为 BaseNode 列表
    nodes = []
    for (node_json_str,) in rows:
        node_dict = json.loads(node_json_str)
        node = BaseNode.from_dict(node_dict)
        nodes.append(node)
    return nodes

retriever = BM25Retriever.from_defaults(
        nodes=nodes,
        similarity_top_k=top_k
    )

llamaindex节点 存储JSON 载入 增删


  1. 节点 ↔ JSON(落盘 & 载入) 每个node保存一个文件的节点名即ID值
from pathlib import Path
import json
from llama_index.core.schema import TextNode, BaseNode

NODE_DIR = Path("index_storage/nodes")
NODE_DIR.mkdir(parents=True, exist_ok=True)

# ---- 保存 ----
def save_nodes(nodes: list[BaseNode]):
    for n in nodes:
        (NODE_DIR / f"{n.id_}.json").write_text(
            json.dumps(n.to_dict(), ensure_ascii=False), encoding="utf8")

# ---- 加载 ----
def load_all_nodes() -> list[BaseNode]:
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

  1. 节点添加(增量文件示例)
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter

splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)

# 新增文件 → 切节点 → 落盘
new_docs = SimpleDirectoryReader(input_files=["new_data/fa.txt"]).load_data()
new_nodes = splitter.get_nodes_from_documents(new_docs)
save_nodes(new_nodes)          # 只写新增

  1. 节点删除(按文档 ID 批量删)
def delete_nodes_by_docid(doc_id: str):
    """把属于同一篇 doc_id 的所有节点文件删掉"""
    for p in NODE_DIR.glob("*.json"):
        node = BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
        if node.metadata.get("doc_id") == doc_id:   # 或你自己约定的字段
            p.unlink()

调用示例:

delete_nodes_by_docid("doc_xxx")   # 删掉整篇文档对应的所有节点

如果数据库记录doc_id对应的node_id那么可以

def delete_nodes_by_nodeid(node_id: str):
    if NODE_DIR/f"{node_id}.json").exists():
        return NODE_DIR/f"{node_id}.json").unlink()
    else:
        return False

  1. 完整增量流程(增/删后重建 BM25)
all_nodes = load_all_nodes()       # 含历史+新增
# 如果前面删过,这里自然就不包含被删的节点
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

至此:

  • 节点以可读 JSON 形式永久存放
  • 增 / 删都只是文件级操作,无需碰任何私有属性
  • 重建 BM25 只需重新 ChineseBM25Retriever(nodes=..., ...)2 min 内完成 100 万篇

全盘载入JSON

from pathlib import Path
import json
from llama_index.core.schema import BaseNode

NODE_DIR = Path("index_storage/nodes")

def load_all_nodes() -> list[BaseNode]:
    """一节点一文件 → 一次性全载入"""
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

# 后面想干嘛就干嘛
all_nodes = load_all_nodes()
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

llamaindex BM25 新建 添加 重建(使用json存储nodes节点信息)

# -*- coding: utf-8 -*-
"""
llama-index-core 0.6.0 中文 BM25 增量索引
节点 -> JSON
BM5 每次重建(不再硬编码私有字段,永不出错)
"""
from __future__ import annotations
from typing import List
import json, jieba
from pathlib import Path

from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.schema import BaseNode, TextNode

# ---------------- 路径 ----------------
STORAGE_ROOT = Path("index_storage")
NODES_DIR   = STORAGE_ROOT / "nodes"

ensure_dir = lambda p: p.mkdir(parents=True, exist_ok=True)

# ---------------- 节点 JSON 序列化 ----------------
def node_to_dict(node: BaseNode) -> dict:
    return {
        "id_": node.id_,
        "text": node.text,
        "metadata": node.metadata,
        "excluded_embed_metadata_keys": node.excluded_embed_metadata_keys,
        "excluded_llm_metadata_keys":   node.excluded_llm_metadata_keys,
    }

def dict_to_node(d: dict) -> TextNode:
    return TextNode(**d)

def save_nodes(nodes: List[BaseNode]):
    ensure_dir(NODES_DIR)
    for n in nodes:
        (NODES_DIR / f"{n.id_}.json").write_text(
            json.dumps(node_to_dict(n), ensure_ascii=False), encoding="utf8")

def load_all_nodes() -> List[BaseNode]:
    if not NODES_DIR.exists():
        return []
    return [dict_to_node(json.loads(p.read_text(encoding="utf8")))
            for p in NODES_DIR.glob("*.json")]

# ---------------- 中文分词 ----------------
class ChineseBM25Retriever(BM25Retriever):
    def _tokenize(self, text: str) -> List[str]:
        return [w for w in jieba.cut(text) if w.strip()]

# ---------------- 业务逻辑 ----------------
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)

def build_or_load_index():
    """只管理节点,BM5 每次都重建"""
    all_nodes = load_all_nodes()
    if all_nodes:
        print(f">>> 加载已有节点:{len(all_nodes)} 条,正在重建 BM5...")
    else:
        print(">>> 首次构建节点...")
        documents = SimpleDirectoryReader("data").load_data()
        all_nodes = splitter.get_nodes_from_documents(documents)
        save_nodes(all_nodes)
    # ****** 每次都重建 BM5,不再碰任何私有属性 ******
    retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
    return all_nodes, retriever

def incremental_update(all_nodes: List[BaseNode]) -> ChineseBM25Retriever:
    """增量追加节点 -> 重建 BM5"""
    input_files = ["new_data/fa.txt", "new_data/fa2.txt"]
    new_docs = SimpleDirectoryReader(input_files=input_files).load_data()
    new_nodes = splitter.get_nodes_from_documents(new_docs)
    if not new_nodes:
        print(">>> 无新增文件,直接复用现有 BM5")
        return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

    print(f">>> 增量新增 {len(new_nodes)} 个节点,重建 BM5...")
    save_nodes(new_nodes)
    all_nodes.extend(new_nodes)
    # ****** 重建 BM5,永不再出错 ******
    return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

# ---------------- 启动 ----------------
if __name__ == "__main__":
    all_nodes, retriever = build_or_load_index()
    retriever = incremental_update(all_nodes)

    results = retriever.retrieve("what is look")
    print(f">>> 查询召回 {len(results)} 条")
    print(f">>> 总节点数 {len(all_nodes)}")

安装试验

  • llama-index
  • llama-index-llms-ollama
  • llama-index-embeddings-ollama
  • llama-index-readers-file 可以读取多种文档,默认只能存文本 结合 SimpleDirectoryReader 一起使用解析文档
  • python-docx 解析docx文档
  • pandas openpyxl xlrd 解析xlsx文档
  • chromadb llama-index-vector-stores-chroma
  • llama-index-retrievers-bm25 jieba (pickle为内置模块无需安装) 1万以下文档
documents = SimpleDirectoryReader(
    input_dir="data",
    extensions=[".docx", ".xlsx"],  # 精准筛选格式,避免读取无关文件
    recursive=True  # 可选:是否递归读取子目录内的文件(True=递归,False=仅根目录)
).load_data()

文件解析

pip install llama-index-readers-file 
pip install python-docx pandas  openpyxl  xlrd

核心安装

pip install llama-index
pip install llama-index-embeddings-ollama llama-index-llms-ollama

外置保存

pip install llama-index-retrievers-bm25 jieba pickle
pip install chromadb llama-index-vector-stores-chroma 

读取文件创建关键词BM25 可以增量但是删除修改要重建

from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers import bm25
import jieba

def chinese_tokenizer(text: str) -> List[str]:
    return list(jieba.cut(text))

# 1. 读文档 切分
documents = SimpleDirectoryReader("data").load_data()
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
new_nodes = splitter.get_nodes_from_documents(documents)


retriever = bm25.BM25Retriever(nodes=new_nodes, similarity_top_k=10)
retriever.tokenizer = chinese_tokenizer

#此处可以省略
# 3. 默认查询
results = retriever.retrieve("author growing up")
print(f"召回 {len(results)} 条")

#新增多个文件切分

input_files = ["data/file1.docx", "data/file2.txt", "data/file3.pdf"]
docs = SimpleDirectoryReader(input_files=input_files).load_data()

# 2. 直接切分(无需区分格式)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
nodes = splitter.get_nodes_from_documents(docs)  # 一步到位

#重新加入文件
retriever = bm25.BM25Retriever(nodes=new_nodes, similarity_top_k=10)
retriever.tokenizer = chinese_tokenizer

#此处可以省略
# 3. 默认查询
results = retriever.retrieve("author growing up??")
print(f"召回 {len(results)} 条")


#上面第二步开始即可调用
# 1. pickle存储
import pickle

with open("./idx/bm25.pkl", "rb") as f:
    bm25_retriever = pickle.load(f)

# 可热调参数
bm25_retriever.similarity_top_k = 15
##-----------

from llama_index.core import SimpleDirectoryReader

# 1. 读新增文件
new_docs = SimpleDirectoryReader("new_data").load_data()

# 2. 增量加入(官方支持 add_nodes)

# 4. ✅ 必须先切成 Node(官方示例缺失这一步)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
new_nodes = splitter.get_nodes_from_documents(new_docs)

# 5. ✅ 只能重建


# 3.  都使用PICKLE覆盖
Path("./idx").mkdir(exist_ok=True)
with open("./idx/bm25.pkl", "wb") as f:
    pickle.dump({"nodes": nodes, "top_k": 10, "tokenizer": chinese_tokenizer}, f)

# 查询(官方示例 ⑤ —— retrieve)
results = bm25_retriever.retrieve("作者童年做了什么")
for node in results:
    print(node.text[:200], node.score)


#融合
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core import VectorStoreIndex

# 1. 向量检索器
vector_index = VectorStoreIndex.from_documents(documents)
vector_retriever = vector_index.as_retriever(similarity_top_k=10)

# 2. BM25 检索器(已加载)pickle.load加载
bm25_retriever = pickle.load(open("./idx/bm25.pkl", "rb"))





# 3. 融合
fusion_retriever = QueryFusionRetriever(
    [vector_retriever, bm25_retriever],
    similarity_top_k=10,
)

# 4. 后处理链(同前)
query_engine = RetrieverQueryEngine(
    retriever=fusion_retriever,
    node_postprocessors=[...],
)





中文优化 jieba 要在from_documents中加入tokenizer=chinese_tokenizer

import jieba

def chinese_tokenizer(text: str) -> List[str]:
    return list(jieba.cut(text))

# 1. 读取全部文档(含增量目录)
all_docs = (SimpleDirectoryReader("data").load_data() +
            SimpleDirectoryReader("new_data").load_data())
nodes = SentenceSplitter(chunk_size=512, chunk_overlap=30).get_nodes_from_documents(all_docs)

bm25.tokenizer = chinese_tokenizer
retriever = bm25.BM25Retriever(nodes=nodes, similarity_top_k=10)

生成一个简单能用的:

# -*- coding: utf-8 -*-
# 0.6.0 实测接口:只有 retrieve,没有 add/add_nodes —— 只能“一次性重建”
import pickle, jieba
from pathlib import Path
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers import bm25

def chinese_tokenizer(text: str) -> list[str]:
    return list(jieba.cut(text))

# 1. 读取全部文档(含增量目录)
all_docs = (SimpleDirectoryReader("data").load_data() +
            SimpleDirectoryReader("new_data").load_data())

# 2. 切节点
nodes = SentenceSplitter(chunk_size=512, chunk_overlap=30).get_nodes_from_documents(all_docs)

# 3. 建 retriever

retriever = bm25.BM25Retriever(nodes=nodes, similarity_top_k=10)
retriever.tokenizer = chinese_tokenizer
#retriever.stemmer = None
# 4. 落盘
Path("./idx").mkdir(exist_ok=True)
with open("./idx/bm25.pkl", "wb") as f:
    pickle.dump({"nodes": nodes, "top_k": 10, "tokenizer": chinese_tokenizer}, f)

# 5. 查询
for n in retriever.retrieve("what is look?"):
    print(n.score, n.text[:200])
  1. 按权限标签/用户把原始文档切成 N 个物理子文件夹(或数据库分区)→
  2. 每个子集分别建 BM25 pickle + 向量索引
  3. 查询时先根据当前用户权限选对应的 pickle & 向量库 →
  4. 用同一套混合逻辑(RRF/加权)召回 →
  5. 最终返回结果。

这样:

  • BM25 召回范围天然受限于子集,不会越权
  • 向量侧仍保留 metadata,可做更细粒度二次过滤展示字段
  • 子库之间完全隔离,加用户/改权限只需新增/重建对应子库,不影响别人。

新的 添加节点 测试通过BM25版本0.6.0

from typing import List
import jieba
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever

# ===================== 核心适配:重写BM25Retriever类以支持中文分词 =====================
class ChineseBM25Retriever(BM25Retriever):
    """适配0.6.0版本的中文BM25检索器(重写分词方法)"""
    def _tokenize(self, text: str) -> List[str]:
        """重写内部分词方法,替换为jieba中文分词"""
        # 过滤空字符串,避免影响BM25计算
        return [word for word in jieba.cut(text) if word.strip()]

# ===================== 1. 初始加载文档 + 切分 + 初始化检索器 =====================
# 加载初始目录下的所有文档
documents = SimpleDirectoryReader("data").load_data()
# 统一初始化切分器(仅初始化一次,保证切分规则一致)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)
# 切分初始文档为节点
original_nodes = splitter.get_nodes_from_documents(documents)

# 【关键修改】使用自定义的中文BM25检索器(0.6.0版本无tokenizer参数)
retriever = ChineseBM25Retriever(
    nodes=original_nodes, 
    similarity_top_k=10  # 仅保留支持的参数
)

# 初始查询
first_results = retriever.retrieve("what is look")
print(f"初始查询 - 召回 {len(first_results)} 条")

# ===================== 2. 新增多个文件 + 切分 + 增量追加 =====================
# 加载新增的混合格式文件
input_files = ["new_data/fa.txt", "new_data/fa2.txt"]
new_docs = SimpleDirectoryReader(input_files=input_files).load_data()
# 用同一个切分器切分新增文件
new_nodes = splitter.get_nodes_from_documents(new_docs)

# 合并所有节点后重新初始化检索器(0.6.0版本无nodes属性,只能重新初始化)
all_nodes = original_nodes + new_nodes
retriever = ChineseBM25Retriever(
    nodes=all_nodes,
    similarity_top_k=10
)

# ===================== 3. 新增文件后重新查询 =====================
second_results = retriever.retrieve("what is look?")
print(f"新增文件后查询 - 召回 {len(second_results)} 条")

# 验证节点数
print(f"\n初始节点数:{len(original_nodes)}")
print(f"新增节点数:{len(new_nodes)}")
print(f"当前检索器总节点数:{len(all_nodes)}")