llamaindex BM25 新建 添加 重建(使用json存储nodes节点信息)

# -*- coding: utf-8 -*-
"""
llama-index-core 0.6.0 中文 BM25 增量索引
节点 -> JSON
BM5 每次重建(不再硬编码私有字段,永不出错)
"""
from __future__ import annotations
from typing import List
import json, jieba
from pathlib import Path

from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.schema import BaseNode, TextNode

# ---------------- 路径 ----------------
STORAGE_ROOT = Path("index_storage")
NODES_DIR   = STORAGE_ROOT / "nodes"

ensure_dir = lambda p: p.mkdir(parents=True, exist_ok=True)

# ---------------- 节点 JSON 序列化 ----------------
def node_to_dict(node: BaseNode) -> dict:
    return {
        "id_": node.id_,
        "text": node.text,
        "metadata": node.metadata,
        "excluded_embed_metadata_keys": node.excluded_embed_metadata_keys,
        "excluded_llm_metadata_keys":   node.excluded_llm_metadata_keys,
    }

def dict_to_node(d: dict) -> TextNode:
    return TextNode(**d)

def save_nodes(nodes: List[BaseNode]):
    ensure_dir(NODES_DIR)
    for n in nodes:
        (NODES_DIR / f"{n.id_}.json").write_text(
            json.dumps(node_to_dict(n), ensure_ascii=False), encoding="utf8")

def load_all_nodes() -> List[BaseNode]:
    if not NODES_DIR.exists():
        return []
    return [dict_to_node(json.loads(p.read_text(encoding="utf8")))
            for p in NODES_DIR.glob("*.json")]

# ---------------- 中文分词 ----------------
class ChineseBM25Retriever(BM25Retriever):
    def _tokenize(self, text: str) -> List[str]:
        return [w for w in jieba.cut(text) if w.strip()]

# ---------------- 业务逻辑 ----------------
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=30)

def build_or_load_index():
    """只管理节点,BM5 每次都重建"""
    all_nodes = load_all_nodes()
    if all_nodes:
        print(f">>> 加载已有节点:{len(all_nodes)} 条,正在重建 BM5...")
    else:
        print(">>> 首次构建节点...")
        documents = SimpleDirectoryReader("data").load_data()
        all_nodes = splitter.get_nodes_from_documents(documents)
        save_nodes(all_nodes)
    # ****** 每次都重建 BM5,不再碰任何私有属性 ******
    retriever = ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)
    return all_nodes, retriever

def incremental_update(all_nodes: List[BaseNode]) -> ChineseBM25Retriever:
    """增量追加节点 -> 重建 BM5"""
    input_files = ["new_data/fa.txt", "new_data/fa2.txt"]
    new_docs = SimpleDirectoryReader(input_files=input_files).load_data()
    new_nodes = splitter.get_nodes_from_documents(new_docs)
    if not new_nodes:
        print(">>> 无新增文件,直接复用现有 BM5")
        return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

    print(f">>> 增量新增 {len(new_nodes)} 个节点,重建 BM5...")
    save_nodes(new_nodes)
    all_nodes.extend(new_nodes)
    # ****** 重建 BM5,永不再出错 ******
    return ChineseBM25Retriever(nodes=all_nodes, similarity_top_k=10)

# ---------------- 启动 ----------------
if __name__ == "__main__":
    all_nodes, retriever = build_or_load_index()
    retriever = incremental_update(all_nodes)

    results = retriever.retrieve("what is look")
    print(f">>> 查询召回 {len(results)} 条")
    print(f">>> 总节点数 {len(all_nodes)}")

发表回复