sqlite中json 键 匹配多个metadata

— 匹配 metadata 中 category=技术文档 AND author=张三
SELECT * FROM vector_store
WHERE json_extract(metadata, ‘$.category’) = ‘技术文档’
AND json_extract(metadata, ‘$.author’) = ‘张三’;

发表在 None | 留下评论

PDF扫描文件加入RAG

1 PDF分页为PNG输出 pdf2image

from pdf2image import convert_from_path
pages = convert_from_path(pdf_path, dpi=200, fmt='png')  # 逐页 PIL.Image
for i, page_img in enumerate(pages, 1):
        # 图片 → base64(不落地磁盘)
        import base64, io
        buffer = io.BytesIO()
        page_img.save(buffer, format='PNG')
        img_b64 = base64.b64encode(buffer.getvalue()).decode()

2调用OCR程序 ,可以使用ollama 直接将图片转为text 然后逐步添加为一个文本变量 添加文件信息的metadata

import ollama  # 需安装:pip install ollama
response = ollama.chat( model=model, messages=[ { "role": "user", "content": [ # 文本指令 {"type": "text", "text": "请识别这张图片中的所有文字,按原格式输出,不要解释。"}, # Base64 图片(type 需匹配格式:image/png 或 image/jpeg) {"type": "image/png", "data": img_b64} ] } ] )

3直接是将文本变量切分 然后向量化

发表在 None | 留下评论

RAG原理

MVP 小规模 1万文档
SQlite 存储node信息,查询后实时BM25再关键词召回
chroma向量召回

大规模企业 10万文档以下
PostgreSQL  节点+关键词;直接关键词召回
(使用 PostgreSQL 全文检索(tsvector,适合英文/分词后中文)
直接SQL语句 需要提前使用jieba分词)
把 PostgreSQL检索封装成 BaseRetriever
Milvus向量召回

发表在 None | 留下评论

文档表 node信息json字段保存,metadata信息JSON保存

  • 文档存储的node 信息 全部json的保存
  • 文件大小
  • 文件创建
  • 修改时间
  • 上传人
  • 上传部门
  • 上传tag
  • 上传tag
  • doc_id
  • metadata 的一个JSON表
  • json列表 :node_id
print("📌 【原始Document文档ID】:", new_docs[0].id_)  # fa.txt对应的文档根ID
print("\n📌 【切分后Node节点ID列表】(落盘JSON文件名):")
for node in new_nodes:
    print(f"→ 节点ID:{node.id_} | 关联文档ID:{node.metadata['document_id']}")

文档.id_.  ==node.metadata['document_id']
node.id_
def load_all_nodes() -> list[BaseNode]:
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

关键词召回 BM25
这里ODE_DIR.glob("*.json")也可以使用数据库生成一个node的json文件类表 此处缓存数据库查询后返回的node信息比较好
实现精确控制对吧
比如 数据库 查询出 
综合部 有2000个node节点。
然后 给出列表 
进行bm25 
然后再查询

向量召回
使用刚刚数据库查询出来的document_id 来指定metadata['document_id']后检索
再向量召回

SQLITE查询载入node信息

def load_all_nodes() -> List[BaseNode]:
    """
    从 SQLite 数据库加载所有节点。
    假设表 `nodes` 中有字段: id (TEXT), node_json (TEXT)
    """
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    try:
        cursor.execute("SELECT node_json FROM nodes")
        rows = cursor.fetchall()
    finally:
        conn.close()

    nodes = []
    for (node_json_str,) in rows:
        # 将 JSON 字符串解析为 dict,再反序列化为 BaseNode
        node_dict = json.loads(node_json_str)
        node = BaseNode.from_dict(node_dict)
        nodes.append(node)

    return nodes

根据metadata查询并返回node信息

    """
    根据 department 值,从 SQLite 查询 metadata 中匹配的节点。
    
    要求表结构:
        CREATE TABLE nodes (
            id TEXT PRIMARY KEY,
            metadata TEXT NOT NULL,   -- 存 JSON 字符串,如 '{"department": "综合部", ...}'
            node_json TEXT NOT NULL   -- 完整 BaseNode 的 JSON
        );
    """

def load_all_nodes(department: str) -> List[BaseNode]:
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    try:
        # 使用 ? 占位符,安全传参
        cursor.execute(
            "SELECT node_json FROM nodes WHERE json_extract(metadata, '$.department') = ?",
            (department,)
        )
        rows = cursor.fetchall()
    finally:
        conn.close()
    # 反序列化为 BaseNode 列表
    nodes = []
    for (node_json_str,) in rows:
        node_dict = json.loads(node_json_str)
        node = BaseNode.from_dict(node_dict)
        nodes.append(node)
    return nodes

retriever = BM25Retriever.from_defaults(
        nodes=nodes,
        similarity_top_k=top_k
    )
发表在 None | 留下评论

llamaindex节点 存储JSON 载入 增删


  1. 节点 ↔ JSON(落盘 & 载入) 每个node保存一个文件的节点名即ID值
from pathlib import Path
import json
from llama_index.core.schema import TextNode, BaseNode

NODE_DIR = Path("index_storage/nodes")
NODE_DIR.mkdir(parents=True, exist_ok=True)

# ---- 保存 ----
def save_nodes(nodes: list[BaseNode]):
    for n in nodes:
        (NODE_DIR / f"{n.id_}.json").write_text(
            json.dumps(n.to_dict(), ensure_ascii=False), encoding="utf8")

# ---- 加载 ----
def load_all_nodes() -> list[BaseNode]:
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

  1. 节点添加(增量文件示例)
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter

splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)

# 新增文件 → 切节点 → 落盘
new_docs = SimpleDirectoryReader(input_files=["new_data/fa.txt"]).load_data()
new_nodes = splitter.get_nodes_from_documents(new_docs)
save_nodes(new_nodes)          # 只写新增

  1. 节点删除(按文档 ID 批量删)
def delete_nodes_by_docid(doc_id: str):
    """把属于同一篇 doc_id 的所有节点文件删掉"""
    for p in NODE_DIR.glob("*.json"):
        node = BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
        if node.metadata.get("doc_id") == doc_id:   # 或你自己约定的字段
            p.unlink()

调用示例:

delete_nodes_by_docid("doc_xxx")   # 删掉整篇文档对应的所有节点

如果数据库记录doc_id对应的node_id那么可以

def delete_nodes_by_nodeid(node_id: str):
    if NODE_DIR/f"{node_id}.json").exists():
        return NODE_DIR/f"{node_id}.json").unlink()
    else:
        return False

  1. 完整增量流程(增/删后重建 BM25)
all_nodes = load_all_nodes()       # 含历史+新增
# 如果前面删过,这里自然就不包含被删的节点
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

至此:

  • 节点以可读 JSON 形式永久存放
  • 增 / 删都只是文件级操作,无需碰任何私有属性
  • 重建 BM25 只需重新 ChineseBM25Retriever(nodes=..., ...)2 min 内完成 100 万篇

全盘载入JSON

from pathlib import Path
import json
from llama_index.core.schema import BaseNode

NODE_DIR = Path("index_storage/nodes")

def load_all_nodes() -> list[BaseNode]:
    """一节点一文件 → 一次性全载入"""
    return [BaseNode.from_dict(json.loads(p.read_text(encoding="utf8")))
            for p in NODE_DIR.glob("*.json")]

# 后面想干嘛就干嘛
all_nodes = load_all_nodes()
retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
发表在 None | 留下评论