PDF扫描文件加入RAG

1 PDF分页为PNG输出 pdf2image

from pdf2image import convert_from_path
pages = convert_from_path(pdf_path, dpi=200, fmt='png')  # 逐页 PIL.Image
for i, page_img in enumerate(pages, 1):
        # 图片 → base64(不落地磁盘)
        import base64, io
        buffer = io.BytesIO()
        page_img.save(buffer, format='PNG')
        img_b64 = base64.b64encode(buffer.getvalue()).decode()

2调用OCR程序 ,可以使用ollama 直接将图片转为text 然后逐步添加为一个文本变量 添加文件信息的metadata

import ollama  # 需安装:pip install ollama
response = ollama.chat( model=model, messages=[ { "role": "user", "content": [ # 文本指令 {"type": "text", "text": "请识别这张图片中的所有文字,按原格式输出,不要解释。"}, # Base64 图片(type 需匹配格式:image/png 或 image/jpeg) {"type": "image/png", "data": img_b64} ] } ] )

3直接是将文本变量切分 然后向量化

RAG原理

MVP 小规模 1万文档
SQlite 存储node信息,查询后实时BM25再关键词召回
chroma向量召回

大规模企业 10万文档以下
PostgreSQL  节点+关键词;直接关键词召回
(使用 PostgreSQL 全文检索(tsvector,适合英文/分词后中文)
直接SQL语句 需要提前使用jieba分词)
把 PostgreSQL检索封装成 BaseRetriever
Milvus向量召回

文档表 node信息json字段保存,metadata信息JSON保存

  • 文档存储的node 信息 全部json的保存
  • 文件大小
  • 文件创建
  • 修改时间
  • 上传人
  • 上传部门
  • 上传tag
  • 上传tag
  • doc_id
  • metadata 的一个JSON表
  • json列表 :node_id
print("📌 【原始Document文档ID】:", new_docs[0].id_)  # fa.txt对应的文档根ID
print("\n📌 【切分后Node节点ID列表】(落盘JSON文件名):")
for node in new_nodes:
    print(f"→ 节点ID:{node.id_} | 关联文档ID:{node.metadata['document_id']}")

文档.id_.  ==node.metadata['document_id']
node.id_
def load_all_nodes() -> list[BaseNode]:
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

关键词召回 BM25
这里ODE_DIR.glob("*.json")也可以使用数据库生成一个node的json文件类表 此处缓存数据库查询后返回的node信息比较好
实现精确控制对吧
比如 数据库 查询出 
综合部 有2000个node节点。
然后 给出列表 
进行bm25 
然后再查询

向量召回
使用刚刚数据库查询出来的document_id 来指定metadata['document_id']后检索
再向量召回

SQLITE查询载入node信息

def load_all_nodes() -> List[BaseNode]:
    """
    从 SQLite 数据库加载所有节点。
    假设表 `nodes` 中有字段: id (TEXT), node_json (TEXT)
    """
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    try:
        cursor.execute("SELECT node_json FROM nodes")
        rows = cursor.fetchall()
    finally:
        conn.close()

    nodes = []
    for (node_json_str,) in rows:
        # 将 JSON 字符串解析为 dict,再反序列化为 BaseNode
        node_dict = json.loads(node_json_str)
        node = BaseNode.from_dict(node_dict)
        nodes.append(node)

    return nodes

根据metadata查询并返回node信息

    """
    根据 department 值,从 SQLite 查询 metadata 中匹配的节点。
    
    要求表结构:
        CREATE TABLE nodes (
            id TEXT PRIMARY KEY,
            metadata TEXT NOT NULL,   -- 存 JSON 字符串,如 '{"department": "综合部", ...}'
            node_json TEXT NOT NULL   -- 完整 BaseNode 的 JSON
        );
    """

def load_all_nodes(department: str) -> List[BaseNode]:
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    try:
        # 使用 ? 占位符,安全传参
        cursor.execute(
            "SELECT node_json FROM nodes WHERE json_extract(metadata, '$.department') = ?",
            (department,)
        )
        rows = cursor.fetchall()
    finally:
        conn.close()
    # 反序列化为 BaseNode 列表
    nodes = []
    for (node_json_str,) in rows:
        node_dict = json.loads(node_json_str)
        node = BaseNode.from_dict(node_dict)
        nodes.append(node)
    return nodes

retriever = BM25Retriever.from_defaults(
        nodes=nodes,
        similarity_top_k=top_k
    )

llamaindex节点 存储JSON 载入 增删


  1. 节点 ↔ JSON(落盘 & 载入) 每个node保存一个文件的节点名即ID值
from pathlib import Path
import json
from llama_index.core.schema import TextNode, BaseNode

NODE_DIR = Path("index_storage/nodes")
NODE_DIR.mkdir(parents=True, exist_ok=True)

# ---- 保存 ----
def save_nodes(nodes: list[BaseNode]):
    for n in nodes:
        (NODE_DIR / f"{n.id_}.json").write_text(
            json.dumps(n.to_dict(), ensure_ascii=False), encoding="utf8")

# ---- 加载 ----
def load_all_nodes() -> list[BaseNode]:
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

  1. 节点添加(增量文件示例)
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter

splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)

# 新增文件 → 切节点 → 落盘
new_docs = SimpleDirectoryReader(input_files=["new_data/fa.txt"]).load_data()
new_nodes = splitter.get_nodes_from_documents(new_docs)
save_nodes(new_nodes)          # 只写新增

  1. 节点删除(按文档 ID 批量删)
def delete_nodes_by_docid(doc_id: str):
    """把属于同一篇 doc_id 的所有节点文件删掉"""
    for p in NODE_DIR.glob("*.json"):
        node = BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
        if node.metadata.get("doc_id") == doc_id:   # 或你自己约定的字段
            p.unlink()

调用示例:

delete_nodes_by_docid("doc_xxx")   # 删掉整篇文档对应的所有节点

如果数据库记录doc_id对应的node_id那么可以

def delete_nodes_by_nodeid(node_id: str):
    if NODE_DIR/f"{node_id}.json").exists():
        return NODE_DIR/f"{node_id}.json").unlink()
    else:
        return False

  1. 完整增量流程(增/删后重建 BM25)
all_nodes = load_all_nodes()       # 含历史+新增
# 如果前面删过,这里自然就不包含被删的节点
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

至此:

  • 节点以可读 JSON 形式永久存放
  • 增 / 删都只是文件级操作,无需碰任何私有属性
  • 重建 BM25 只需重新 ChineseBM25Retriever(nodes=..., ...)2 min 内完成 100 万篇

全盘载入JSON

from pathlib import Path
import json
from llama_index.core.schema import BaseNode

NODE_DIR = Path("index_storage/nodes")

def load_all_nodes() -> list[BaseNode]:
    """一节点一文件 → 一次性全载入"""
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

# 后面想干嘛就干嘛
all_nodes = load_all_nodes()
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

llamaindex BM25 新建 添加 重建(使用json存储nodes节点信息)

# -*- coding: utf-8 -*-
"""
llama-index-core 0.6.0 中文 BM25 增量索引
节点 -> JSON
BM5 每次重建(不再硬编码私有字段,永不出错)
"""
from __future__ import annotations
from typing import List
import json, jieba
from pathlib import Path

from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.schema import BaseNode, TextNode

# ---------------- 路径 ----------------
STORAGE_ROOT = Path("index_storage")
NODES_DIR   = STORAGE_ROOT / "nodes"

ensure_dir = lambda p: p.mkdir(parents=True, exist_ok=True)

# ---------------- 节点 JSON 序列化 ----------------
def node_to_dict(node: BaseNode) -> dict:
    return {
        "id_": node.id_,
        "text": node.text,
        "metadata": node.metadata,
        "excluded_embed_metadata_keys": node.excluded_embed_metadata_keys,
        "excluded_llm_metadata_keys":   node.excluded_llm_metadata_keys,
    }

def dict_to_node(d: dict) -> TextNode:
    return TextNode(**d)

def save_nodes(nodes: List[BaseNode]):
    ensure_dir(NODES_DIR)
    for n in nodes:
        (NODES_DIR / f"{n.id_}.json").write_text(
            json.dumps(node_to_dict(n), ensure_ascii=False), encoding="utf8")

def load_all_nodes() -> List[BaseNode]:
    if not NODES_DIR.exists():
        return []
    return [dict_to_node(json.loads(p.read_text(encoding="utf8")))
            for p in NODES_DIR.glob("*.json")]

# ---------------- 中文分词 ----------------
class ChineseBM25Retriever(BM25Retriever):
    def _tokenize(self, text: str) -> List[str]:
        return [w for w in jieba.cut(text) if w.strip()]

# ---------------- 业务逻辑 ----------------
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)

def build_or_load_index():
    """只管理节点,BM5 每次都重建"""
    all_nodes = load_all_nodes()
    if all_nodes:
        print(f">>> 加载已有节点:{len(all_nodes)} 条,正在重建 BM5...")
    else:
        print(">>> 首次构建节点...")
        documents = SimpleDirectoryReader("data").load_data()
        all_nodes = splitter.get_nodes_from_documents(documents)
        save_nodes(all_nodes)
    # ****** 每次都重建 BM5,不再碰任何私有属性 ******
    retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
    return all_nodes, retriever

def incremental_update(all_nodes: List[BaseNode]) -> ChineseBM25Retriever:
    """增量追加节点 -> 重建 BM5"""
    input_files = ["new_data/fa.txt", "new_data/fa2.txt"]
    new_docs = SimpleDirectoryReader(input_files=input_files).load_data()
    new_nodes = splitter.get_nodes_from_documents(new_docs)
    if not new_nodes:
        print(">>> 无新增文件,直接复用现有 BM5")
        return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

    print(f">>> 增量新增 {len(new_nodes)} 个节点,重建 BM5...")
    save_nodes(new_nodes)
    all_nodes.extend(new_nodes)
    # ****** 重建 BM5,永不再出错 ******
    return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

# ---------------- 启动 ----------------
if __name__ == "__main__":
    all_nodes, retriever = build_or_load_index()
    retriever = incremental_update(all_nodes)

    results = retriever.retrieve("what is look")
    print(f">>> 查询召回 {len(results)} 条")
    print(f">>> 总节点数 {len(all_nodes)}")