月度归档:2025年10月

OCR MinerU2.5

#UV


python -m pip install -U pip uv -i https://mirrors.aliyun.com/pypi/simple
uv pip install -U "mineru[core]" -i https://mirrors.aliyun.com/pypi/simple


#---



pip install -U "mineru[core]" -i https://mirrors.aliyun.com/pypi/simple

# 3. 下载模型权重(国内镜像)
 三种方法
export MINERU_MODEL_SOURCE=modelscope
#linux


$Env:MINERU_MODEL_SOURCE = "modelscope"
#windows powershell

set MINERU_MODEL_SOURCE=modelscope
#windows CMD

mineru-models-download        # 首次运行会自动生成 ~/mineru.json 配置
#会让你选择下载源 modelscope
#选择下载类型 all


# 查看版本
mineru --version
# 示例:mineru 0.9.2

# 跑一张 CPU 单线程测试
mineru -p sample.pdf -o out_dir

#out_dir下有一个 sample.md文件  可以给LLM识别 
#文本识别模型可以使用! qwen3 也可以

Saya 0.0.2 版本

MCP server交流:通过ollama 的交流模型,让其AI判断是否要调用记忆接口,然后通过

服务一:记忆接口,使用llamaindex 记忆模块

MCP负责交流所有的交流记忆信息保存为文本。 然后自动提交到llamaindex 的data 重新向量化

LLM 的设置 在mcp中使用prompt 设置相关信息

TAG

LlamaIndex 没有叫“遗忘参数”的单一旋钮,但把“人类式遗忘”拆成了 时间衰减、重要性加权、容量淘汰、手动删除 四条主线,组合后就能逼近 Ebbinghaus 曲线。

LLAMAINDEX一个就可以完成 对话记忆+文件记忆

llamaindex 放入PDF docx xlsx

安装CNORCR

pip install llama-index-readers-file pymupdf docx2txt openpyxl pandas
pip install cnocr[ort-cpu]
pip install watchdog

CNOCR下载模型,先创建脚本 自动下载:

# 在一台能上网的机器执行一次即可
from cnocr import CnOcr
_ = CnOcr()                     # 首次会自动把检测+识别模型
                                # 下载到 ~/cnocr/2.3 目录
# 把整个 ~/cnocr 文件夹拷到离线机同名路径即可

执行

# =================  依赖安装  =================
# pip install llama-index llama-index-llms-ollama llama-index-embeddings-ollama
# pip install cnocr watchdog fitz docx2txt pandas openpyxl

from pathlib import Path
import time, hashlib

from llama_index.core import Settings, Document, VectorStoreIndex
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from cnocr import CnOcr
import fitz, docx2txt, pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# ================= 全局模型 =================
Settings.llm = Ollama(model="gemma:4b", request_timeout=600)
Settings.embed_model = OllamaEmbedding(model_name="embeddinggemma")

ocr = CnOcr(root="~/cnocr/2.3")   # 预下载权重目录

# ================= 文件读取器 =================
class LocalFileReader:
    def load_data(self, file_path: str):
        p = Path(file_path)
        if p.suffix.lower() == ".pdf":
            doc = fitz.open(p)
            text = ""
            for page in doc:
                out = ocr.ocr(page.get_pixmap().tobytes("png"))
                text += "\n".join(["".join(line) for line in out])
            return [Document(text=text, metadata={"file_name": p.name})]

        if p.suffix.lower() in [".docx", ".doc"]:
            txt = docx2txt.process(p)
            return [Document(text=txt, metadata={"file_name": p.name})]

        if p.suffix.lower() in [".xlsx", ".xls"]:
            dfs = pd.read_excel(p, sheet_name=None)
            txt = "\n".join(f"【{s}】\n{d.to_csv(index=False, sep='\t')}" for s, d in dfs.items())
            return [Document(text=txt, metadata={"file_name": p.name})]

        if p.suffix.lower() in [".png", ".jpg", ".jpeg"]:
            out = ocr.ocr(str(p))
            txt = "\n".join(["".join(line) for line in out])
            return [Document(text=txt, metadata={"file_name": p.name})]

        # txt / md
        from llama_index.core import SimpleDirectoryReader
        return SimpleDirectoryReader(input_files=[p]).load_data()

# ================= 工具:计算文件 hash =================
def file_hash(path: Path, chunk=8192):
    h = hashlib.md5()
    with open(path, "rb") as f:
        while chunk := f.read(chunk):
            h.update(chunk)
    return h.hexdigest()

# ================= 目录扫描 + 索引构建 =================
DATA_DIR = Path("data")
SUPPORTED = (".pdf", ".docx", ".doc", ".xlsx", ".xls", ".png", ".jpg", ".jpeg", ".txt", ".md")

def build_index():
    all_docs = []
    for p in DATA_DIR.rglob("*"):
        if p.is_file() and p.suffix.lower() in SUPPORTED:
            all_docs += LocalFileReader().load_data(p)
    return VectorStoreIndex.from_documents(all_docs)

# ================= 监视器:文件变化自动重建 =================
class DataHandler(FileSystemEventHandler):
    def __init__(self, rebuild_func):
        self.rebuild = rebuild_func

    def on_any_event(self, event):
        if event.is_directory:
            return
        if Path(event.src_path).suffix.lower() in SUPPORTED:
            print(f"[watchdog] {event.event_type} -> {event.src_path}")
            self.rebuild()

def rebuild():
    global query_engine
    print("[rebuild] 正在重新索引...")
    new_index = build_index()
    query_engine = new_index.as_query_engine()
    print("[rebuild] 完成!")

# ================= 首次建库 =================
query_engine = build_index().as_query_engine()

# ================= 启动监视 =================
observer = Observer()
observer.schedule(DataHandler(rebuild), str(DATA_DIR), recursive=True)
observer.start()

# ================= 交互 / 测试 =================
if __name__ == "__main__":
    try:
        while True:
            q = input("\n请输入问题(q 退出):").strip()
            if q.lower() == "q":
                break
            resp = query_engine.query(q)
            print(resp)
    except KeyboardInterrupt:
        pass
    finally:
        observer.stop()
        observer.join()

FASTAPI 开放API

from pathlib import Path
from fastapi import FastAPI
from pydantic import BaseModel

from llama_index.core import Settings, Document, VectorStoreIndex
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from cnocr import CnOcr
import fitz, docx2txt, pandas as pd

# ---------------- 模型 ----------------
Settings.llm      = Ollama(model="gemma:4b", request_timeout=600)
Settings.embed_model = OllamaEmbedding(model_name="embeddinggemma")
ocr = CnOcr(root="~/cnocr/2.3")

# ---------------- 文件读取器 ----------------
class LocalFileReader:
    def load_data(self, file_path: str):
        p = Path(file_path)
        if p.suffix.lower() == ".pdf":
            doc = fitz.open(p)
            text = ""
            for page in doc:
                out = ocr.ocr(page.get_pixmap().tobytes("png"))
                text += "\n".join(["".join(line) for line in out])
            return [Document(text=text, metadata={"file_name": p.name})]

        if p.suffix.lower() in [".docx", ".doc"]:
            txt = docx2txt.process(p)
            return [Document(text=txt, metadata={"file_name": p.name})]

        if p.suffix.lower() in [".xlsx", ".xls"]:
            dfs = pd.read_excel(p, sheet_name=None)
            txt = "\n".join(f"【{s}】\n{d.to_csv(index=False, sep='\t')}" for s, d in dfs.items())
            return [Document(text=txt, metadata={"file_name": p.name})]

        if p.suffix.lower() in [".png", ".jpg", ".jpeg"]:
            out = ocr.ocr(str(p))
            txt = "\n".join(["".join(line) for line in out])
            return [Document(text=txt, metadata={"file_name": p.name})]

        from llama_index.core import SimpleDirectoryReader
        return SimpleDirectoryReader(input_files=[p]).load_data()

# ---------------- 建索引(启动时一次) ----------------
DATA_DIR = Path("data")
SUPPORTED = (".pdf", ".docx", ".doc", ".xlsx", ".xls", ".png", ".jpg", ".jpeg", ".txt", ".md")

all_docs = []
for p in DATA_DIR.rglob("*"):
    if p.is_file() and p.suffix.lower() in SUPPORTED:
        all_docs += LocalFileReader().load_data(p)

index = VectorStoreIndex.from_documents(all_docs)
query_engine = index.as_query_engine()

# ---------------- FastAPI ----------------
app = FastAPI(title="LocalRAG")

class Q(BaseModel):
    question: str

@app.post("/ask")
def ask(q: Q):
    return {"answer": str(query_engine.query(q.question))}

# ---------------- 启动命令 ----------------
# uvicorn api:app --host 0.0.0.0 --port 8000

llama-index RAG 配合ollama

安装llama-index

pip install llama-index

安装ollama的对应模块

pip install llama-index-llms-ollama llama-index-embeddings-ollama

#安装模型
ollama pull embeddinggemma
ollama pull gemma:4b
#可以换模型

默认以当前的目录下的data 文件夹下文件(后期插入文件可以定时执行来刷新RAG)

创建测试代码 app.py

#!/usr/bin/env python3
# demo.py : LlamaIndex + Ollama 本地问答
import os
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    Settings,
    StorageContext,
    load_index_from_storage,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding

# 1. 全局模型配置
OLLAMA_URL = "http://127.0.0.1:11434"      # 默认端口
EMBED_MODEL = "embeddinggemma"           # 向量模型
LLM_MODEL = "gemma3:4b"                     # 生成模型

Settings.embed_model = OllamaEmbedding(
    model_name=EMBED_MODEL, base_url=OLLAMA_URL
)
Settings.llm = Ollama(
    model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0
)

# 2. 持久化目录(同一目录二次启动可直接加载索引,无需重新解析)
PERSIST_DIR = "./storage"

# 3. 加载或构建索引
if not os.path.exists(PERSIST_DIR):
    print(">>> 首次运行,解析文档并构建索引 …")
    documents = SimpleDirectoryReader("data").load_data()
    index = VectorStoreIndex.from_documents(documents)
    index.storage_context.persist(persist_dir=PERSIST_DIR)
else:
    print(">>> 发现已有索引,直接加载 …")
    storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
    index = load_index_from_storage(storage_context)

# 4. 创建查询引擎
query_engine = index.as_query_engine(similarity_top_k=3)

# 5. 简单交互
if __name__ == "__main__":
    print("===  LlamaIndex + Ollama 本地问答  ===")
    while True:
        q = input("\n问题 (q 退出): ").strip()
        if q.lower() == "q":
            break
        resp = query_engine.query(q)
        print("答:", resp)

定时刷新 使用watchdog

pip install watchdog

定时刷新data目录

#!/usr/bin/env python3
# demo.py : LlamaIndex + Ollama 本地问答 + 定时自动刷新索引
import os, time, threading
from pathlib import Path
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    Settings,
    StorageContext,
    load_index_from_storage,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.llama_index.embeddings.ollama import OllamaEmbedding

# -------------------- 1. 全局模型配置 --------------------
OLLAMA_URL = "http://127.0.0.1:11434"
EMBED_MODEL = "nomic-embed-text"   # 可自行替换
LLM_MODEL = "llama3.1:8b"          # 可自行替换

Settings.embed_model = OllamaEmbedding(model_name=EMBED_MODEL, base_url=OLLAMA_URL)
Settings.llm = Ollama(model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0)

# -------------------- 2. 参数 --------------------
DATA_DIR = Path("data")
PERSIST_DIR = Path("./storage")
RESCAN_INTERVAL = 5 * 60           # 秒,5 分钟扫一次

# -------------------- 3. 索引管理 --------------------
def build_index():
    """重新解析文档并持久化索引"""
    print("[index] 正在重新构建索引 …")
    documents = SimpleDirectoryReader(str(DATA_DIR)).load_data()
    index = VectorStoreIndex.from_documents(documents)
    index.storage_context.persist(persist_dir=str(PERSIST_DIR))
    print("[index] 索引已更新完成")
    return index

def load_or_build():
    """启动时:有缓存就加载,没有就新建"""
    if PERSIST_DIR.exists():
        print(">>> 发现已有索引,直接加载 …")
        storage_context = StorageContext.from_defaults(persist_dir=str(PERSIST_DIR))
        return load_index_from_storage(storage_context)
    else:
        print(">>> 首次运行,解析文档并构建索引 …")
        return build_index()

# -------------------- 4. 定时刷新线程 --------------------
def md5_dir():
    """简易指纹:拼接所有文件「路径+修改时间」"""
    return "|".join(
        f"{p.relative_to(DATA_DIR)}:{p.stat().st_mtime}"
        for p in sorted(DATA_DIR.rglob("*"))
        if p.is_file()
    )

def watcher():
    """后台线程:循环扫描目录,有变化就重建"""
    last_md5 = md5_dir()
    while True:
        time.sleep(RESCAN_INTERVAL)
        new_md5 = md5_dir()
        if new_md5 != last_md5:
            last_md5 = new_md5
            global index, query_engine
            index = build_index()
            query_engine = index.as_query_engine(similarity_top_k=3)
        else:
            print("[index] 目录无变化,跳过刷新")

# -------------------- 5. 启动 --------------------
index = load_or_build()
query_engine = index.as_query_engine(similarity_top_k=3)

threading.Thread(target=watcher, daemon=True).start()

# -------------------- 6. 交互 --------------------
if __name__ == "__main__":
    print("===  LlamaIndex + Ollama 本地问答 (定时刷新已开启) ===")
    while True:
        q = input("\n问题 (q 退出): ").strip()
        if q.lower() == "q":
            break
        resp = query_engine.query(q)
        print("答:", resp)

模拟人类记忆 海马体

其中 data存放外部数据,对话内容产生记忆。

  1. 短期记忆
    就是最近 N 轮问答文本本身ChatMemoryBuffer 里的 FIFO 队列)。
    用户每输入一个问题、模型每给出一次回答,都会被原封不动地压进队列;当总 token 数超过 SHORT_MEMORY_TOKENS 时,最老的整轮对话会被整体弹出——这就是即时遗忘
  2. 长期记忆
    由后台线程从“刚刚发生”的问答文本里二次加工产生:
    • 事实记忆:用非常简陋的「A is B」正则把句子拆成 (主语, 谓词, 宾语) 三元组,再写进 FactMemoryBlock;容量满或长期未用即被淘汰。
    • 语义向量:把整句/片段直接做成向量入库,后续用相似度召回;分数会随时间衰减,低于阈值或总量超限即被删除。

也就是说,没有对话就没有任何记忆可存;文档(data/目录)只提供“外部知识”,不会被自动写回记忆系统。
如果你想让模型把阅读到的文档内容也记到长期记忆里,只需要在 build_index() 之后把 documents 遍历一遍,手动调用:

for doc in documents:
    hippo.vector.add(doc.text) # 语义记忆
# 或抽实体后 hippo.fact.add(…) # 事实记忆
#!/usr/bin/env python3
# demo_memory.py : LlamaIndex + Ollama + 海马体式「记忆–遗忘」
import os
import time
import threading
from collections import deque
from datetime import datetime
from pathlib import Path

from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    Settings,
    StorageContext,
    load_index_from_storage,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores import SimpleVectorStore

# -------------------- 1. 全局模型 --------------------
OLLAMA_URL = "http://127.0.0.1:11434"
EMBED_MODEL = "nomic-embed-text"
LLM_MODEL = "llama3.1:8b"
Settings.embed_model = OllamaEmbedding(model_name=EMBED_MODEL, base_url=OLLAMA_URL)
Settings.llm = Ollama(model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0)

# -------------------- 2. 参数 --------------------
DATA_DIR = Path("data")
PERSIST_DIR = Path("./storage")
MEMORY_DIR = PERSIST_DIR / "memory"          # 长期记忆持久化
RESCAN_INTERVAL = 5 * 60
SHORT_MEMORY_TOKENS = 2000                   # 短期记忆 token 上限
FACT_LIMIT = 200                             # 长期事实上限
VECTOR_LIMIT = 1000                          # 语义向量上限
DECAY_HALF_DAY = 0.9                         # 每 12h 衰减系数

# -------------------- 3. 轻量级 MemoryBlock --------------------
class FactMemoryBlock:
    """结构化事实:【subject】【predicate】【object】+ 时间戳 + 使用次数"""
    def __init__(self, limit=FACT_LIMIT):
        self.limit = limit
        self.facts = []          # [(sub, pred, obj, ts, count), ...]
        self.load()

    def add(self, sub, pred, obj):
        now = datetime.utcnow()
        # 简单合并:同一 (sub,pred) 只保留最新 obj
        self.facts = [(s, p, o, ts, c) for (s, p, o, ts, c) in self.facts if not (s == sub and p == pred)]
        self.facts.append((sub, pred, obj, now, 1))
        if len(self.facts) > self.limit:
            # 淘汰最久未用(count 最小且最旧)
            self.facts.sort(key=lambda x: (x[4], x[3]))
            self.facts = self.facts[-self.limit:]

    def retrieve(self, query_sub, topk=5):
        """朴素字符串匹配 + 使用次数加权"""
        hits = []
        for sub, pred, obj, ts, c in self.facts:
            score = c
            if query_sub.lower() in sub.lower():
                score += 5
            hits.append((score, sub, pred, obj))
        hits.sort(reverse=True)
        return hits[:topk]

    def persist(self):
        MEMORY_DIR.mkdir(exist_ok=True)
        with open(MEMORY_DIR / "facts.txt", "w", encoding="utf8") as f:
            for sub, pred, obj, ts, c in self.facts:
                f.write(f"{sub}\t{pred}\t{obj}\t{ts.isoformat()}\t{c}\n")

    def load(self):
        if (MEMORY_DIR / "facts.txt").exists():
            with open(MEMORY_DIR / "facts.txt", encoding="utf8") as f:
                for line in f:
                    sub, pred, obj, ts_str, c = line.strip().split("\t")
                    self.facts.append((sub, pred, obj, datetime.fromisoformat(ts_str), int(c)))


class VectorMemoryBlock:
    """语义向量:用 SimpleVectorStore 存节点,附加时间戳 & 分数"""
    def __init__(self, limit=VECTOR_LIMIT, decay=DECAY_HALF_DAY):
        self.limit = limit
        self.decay = decay
        self.store = SimpleVectorStore()
        self.nodes = []          # 本地缓存 (node, ts, score)
        if (MEMORY_DIR / "vector_store.json").exists():
            self.store = SimpleVectorStore.from_persist_path(str(MEMORY_DIR / "vector_store.json"))
            # 简版:只加载最近 limit 条
        self._trim()

    def add(self, text):
        now = datetime.utcnow()
        node = TextNode(text=text)
        self.nodes.append((node, now, 1.0))
        self.store.add([node])
        self._trim()

    def retrieve(self, query_str, top_k=3):
        """向量检索 + 时间衰减分数重排"""
        emb = Settings.embed_model.get_text_embedding(query_str)
        hits = self.store.query(embedding=emb, similarity_top_k=top_k * 2)
        res = []
        for n, ts, score in self.nodes:
            hours = (datetime.utcnow() - ts).total_seconds() / 3600
            decayed = score * (self.decay ** (hours / 12))
            for hit in hits:
                if hit.node.node_id == n.node_id:
                    res.append((decayed * hit.score, n.text))
                    break
        res.sort(reverse=True)
        return [txt for _, txt in res[:top_k]]

    def _trim(self):
        if len(self.nodes) > self.limit:
            # 按分数+时间排序淘汰
            self.nodes.sort(key=lambda x: (x[2], x[1]))
            drop = self.nodes[:len(self.nodes) - self.limit]
            self.nodes = self.nodes[-self.limit:]
            # 同步从 store 删除(简单重建)
            self.store = SimpleVectorStore()
            self.store.add([n for n, _, _ in self.nodes])

    def persist(self):
        MEMORY_DIR.mkdir(exist_ok=True)
        self.store.persist(str(MEMORY_DIR / "vector_store.json"))


# -------------------- 4. 记忆管理器 --------------------
class HippocampusMemory:
    def __init__(self):
        self.short = ChatMemoryBuffer(token_limit=SHORT_MEMORY_TOKENS)
        self.fact = FactMemoryBlock()
        self.vector = VectorMemoryBlock()

    def add_interaction(self, user, assistant):
        # 短期记忆
        self.short.put(f"User: {user}\nAssistant: {assistant}")
        # 异步长期巩固(简单抽实体)
        threading.Thread(target self._consolidate, args=(user + " " + assistant,), daemon=True).start()

    def _consolidate(self, text):
        """简易实体→事实抽取 & 语义片段入库"""
        # 这里用 LLM 抽实体/摘要,为演示直接按逗号切
        for sent in text.split("."):
            sent = sent.strip()
            if len(sent) < 10:
                continue
            # 向量记忆
            self.vector.add(sent)
            # 事实记忆:简单「A 是 B」模式
            if " is " in sent:
                sub, obj = sent.split(" is ", 1)
                self.fact.add(sub.strip(), "is", obj.strip())

    def retrieve_context(self, query):
        # 短期
        short_hist = self.short.get()
        # 长期
        facts = self.fact.retrieve(query)
        vectors = self.vector.retrieve(query)
        long_part = "\n".join([f"{s} {p} {o}" for _, s, p, o in facts]) + "\n" + "\n".join(vectors)
        return f"短期记忆:\n{short_hist}\n\n长期记忆:\n{long_part}"

    def periodic_forget(self):
        """定时遗忘:长期记忆自己维护容量 & 衰减"""
        self.fact.persist()
        self.vector.persist()


hippo = HippocampusMemory()

# -------------------- 5. 索引管理(同你原来) --------------------
def build_index():
    print("[index] 正在重新构建索引 …")
    documents = SimpleDirectoryReader(str(DATA_DIR)).load_data()
    index = VectorStoreIndex.from_documents(documents)
    index.storage_context.persist(persist_dir=str(PERSIST_DIR))
    return index

def load_or_build():
    if PERSIST_DIR.exists():
        storage_context = StorageContext.from_defaults(persist_dir=str(PERSIST_DIR))
        return load_index_from_storage(storage_context)
    else:
        return build_index()

# -------------------- 6. 后台线程 --------------------
def md5_dir():
    return "|".join(f"{p.relative_to(DATA_DIR)}:{p.stat().st_mtime}" for p in sorted(DATA_DIR.rglob("*")) if p.is_file())

def watcher():
    last_md5 = md5_dir()
    while True:
        time.sleep(RESCAN_INTERVAL)
        new_md5 = md5_dir()
        if new_md5 != last_md5:
            last_md5 = new_md5
            global index
            index = build_index()
        hippo.periodic_forget()

# -------------------- 7. 启动 --------------------
index = load_or_build()
threading.Thread(target=watcher, daemon=True).start()

# -------------------- 8. 交互 --------------------
if __name__ == "__main__":
    print("===  LlamaIndex + Ollama + 海马体记忆 (q 退出) ===")
    while True:
        q = input("\n问题: ").strip()
        if q.lower() == "q":
            break
        # 把记忆作为上下文拼到 prompt
        context = hippo.retrieve_context(q)
        prompt = f"以下背景知识供参考:\n{context}\n\n用户问题:{q}"
        resp = index.as_query_engine(similarity_top_k=3).query(prompt)
        print("答:", resp)
        hippo.add_interaction(q, str(resp))

结合文档变成长期记忆

#!/usr/bin/env python3
# hippo_bot.py : LlamaIndex + Ollama + 海马体记忆(对话产生 + 文档产生)
import os
import time
import threading
from collections import deque
from datetime import datetime
from pathlib import Path

from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    Settings,
    StorageContext,
    load_index_from_storage,
    Document,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores import SimpleVectorStore

# -------------------- 1. 模型配置 --------------------
OLLAMA_URL = "http://127.0.0.1:11434"
EMBED_MODEL = "nomic-embed-text"
LLM_MODEL = "llama3.1:8b"
Settings.embed_model = OllamaEmbedding(model_name=EMBED_MODEL, base_url=OLLAMA_URL)
Settings.llm = Ollama(model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0)

# -------------------- 2. 目录/常数 --------------------
DATA_DIR = Path("data")
PERSIST_DIR = Path("./storage")
MEMORY_DIR = PERSIST_DIR / "memory"
RESCAN_INTERVAL = 5 * 60            # 5 分钟扫一次
SHORT_TOKENS = 2000                 # 短期记忆 token 上限
FACT_LIMIT = 200                    # 事实上限
VECTOR_LIMIT = 1000                 # 语义片上限
DECAY = 0.9                         # 每 12h 衰减系数

MEMORY_DIR.mkdir(parents=True, exist_ok=True)

# -------------------- 3. 长期记忆块(同上一版) ----------
class FactMemoryBlock:
    def __init__(self, limit=FACT_LIMIT):
        self.limit = limit
        self.facts = []          # [(sub, pred, obj, ts, count), ...]
        self.load()

    def add(self, sub, pred, obj):
        now = datetime.utcnow()
        self.facts = [(s, p, o, ts, c) for (s, p, o, ts, c) in self.facts if not (s == sub and p == pred)]
        self.facts.append((sub.strip(), pred.strip(), obj.strip(), now, 1))
        if len(self.facts) > self.limit:
            self.facts.sort(key=lambda x: (x[4], x[3]))
            self.facts = self.facts[-self.limit:]

    def retrieve(self, query_sub, topk=5):
        hits = []
        for sub, pred, obj, ts, c in self.facts:
            score = c
            if query_sub.lower() in sub.lower():
                score += 5
            hits.append((score, sub, pred, obj))
        hits.sort(reverse=True)
        return hits[:topk]

    def persist(self):
        with open(MEMORY_DIR / "facts.txt", "w", encoding="utf8") as f:
            for sub, pred, obj, ts, c in self.facts:
                f.write(f"{sub}\t{pred}\t{obj}\t{ts.isoformat()}\t{c}\n")

    def load(self):
        p = MEMORY_DIR / "facts.txt"
        if p.exists():
            with p.open(encoding="utf8") as f:
                for line in f:
                    sub, pred, obj, ts_str, c = line.strip().split("\t")
                    self.facts.append((sub, pred, obj, datetime.fromisoformat(ts_str), int(c)))


class VectorMemoryBlock:
    def __init__(self, limit=VECTOR_LIMIT, decay=DECAY):
        self.limit = limit
        self.decay = decay
        self.store = SimpleVectorStore()
        self.nodes = []          # [(node, ts, score), ...]
        if (MEMORY_DIR / "vector_store.json").exists():
            self.store = SimpleVectorStore.from_persist_path(str(MEMORY_DIR / "vector_store.json"))
        self._trim()

    def add(self, text):
        now = datetime.utcnow()
        node = TextNode(text=text)
        self.nodes.append((node, now, 1.0))
        self.store.add([node])
        self._trim()

    def retrieve(self, query_str, top_k=3):
        emb = Settings.embed_model.get_text_embedding(query_str)
        hits = self.store.query(embedding=emb, similarity_top_k=top_k * 2)
        res = []
        for n, ts, score in self.nodes:
            hours = (datetime.utcnow() - ts).total_seconds() / 3600
            decayed = score * (self.decay ** (hours / 12))
            for hit in hits:
                if hit.node.node_id == n.node_id:
                    res.append((decayed * hit.score, n.text))
                    break
        res.sort(reverse=True)
        return [txt for _, txt in res[:top_k]]

    def _trim(self):
        if len(self.nodes) > self.limit:
            self.nodes.sort(key=lambda x: (x[2], x[1]))
            drop = self.nodes[:len(self.nodes) - self.limit]
            self.nodes = self.nodes[-self.limit:]
            self.store = SimpleVectorStore()
            self.store.add([n for n, _, _ in self.nodes])

    def persist(self):
        self.store.persist(str(MEMORY_DIR / "vector_store.json"))


# -------------------- 4. 海马体记忆管理器 --------------------
class HippocampusMemory:
    def __init__(self):
        self.short = ChatMemoryBuffer(token_limit=SHORT_TOKENS)
        self.fact = FactMemoryBlock()
        self.vector = VectorMemoryBlock()

    # 1. 对话产生的记忆
    def add_dialog(self, user: str, assistant: str):
        self.short.put(f"User: {user}\nAssistant: {assistant}")
        # 异步巩固
        threading.Thread(target=self._consolidate, args=(user + " " + assistant,), daemon=True).start()

    # 2. 文档产生的记忆
    def add_documents(self, docs: list[Document]):
        for doc in docs:
            self.vector.add(doc.text)
            # 简单抽「A is B」
            for sent in doc.text.split("."):
                if " is " in sent:
                    sub, obj = sent.split(" is ", 1)
                    self.fact.add(sub.strip(), "is", obj.strip())

    def _consolidate(self, text: str):
        for sent in text.split("."):
            sent = sent.strip()
            if len(sent) < 10:
                continue
            self.vector.add(sent)
            if " is " in sent:
                sub, obj = sent.split(" is ", 1)
                self.fact.add(sub.strip(), "is", obj.strip())

    def retrieve(self, query: str) -> str:
        short = self.short.get()
        facts = self.fact.retrieve(query)
        vectors = self.vector.retrieve(query)
        long = "\n".join([f"{s} {p} {o}" for _, s, p, o in facts]) + "\n" + "\n".join(vectors)
        return f"短期记忆:\n{short}\n\n长期记忆:\n{long}"

    def periodic_persist(self):
        self.fact.persist()
        self.vector.persist()


hippo = HippocampusMemory()

# -------------------- 5. 索引管理 --------------------
def build_index():
    print("[index] 解析文档并构建索引 …")
    documents = SimpleDirectoryReader(str(DATA_DIR)).load_data()
    index = VectorStoreIndex.from_documents(documents)
    index.storage_context.persist(persist_dir=str(PERSIST_DIR))
    # ❗新增:把文档内容也写进长期记忆
    hippo.add_documents(documents)
    print("[index] 索引+记忆已更新")
    return index

def load_or_build():
    if PERSIST_DIR.exists():
        print(">>> 加载已有索引 …")
        storage_context = StorageContext.from_defaults(persist_dir=str(PERSIST_DIR))
        return load_index_from_storage(storage_context)
    else:
        return build_index()

# -------------------- 6. 后台定时线程 --------------------
def md5_dir():
    return "|".join(f"{p.relative_to(DATA_DIR)}:{p.stat().st_mtime}" for p in sorted(DATA_DIR.rglob("*")) if p.is_file())

def watcher():
    last_md5 = md5_dir()
    while True:
        time.sleep(RESCAN_INTERVAL)
        new_md5 = md5_dir()
        if new_md5 != last_md5:
            last_md5 = new_md5
            global index
            index = build_index()
        else:
            print("[index] 无变化,跳过刷新")
        hippo.periodic_persist()


index = load_or_build()
threading.Thread(target=watcher, daemon=True).start()

# -------------------- 7. 交互 --------------------------
if __name__ == "__main__":
    print("===  LlamaIndex + Ollama + 海马体记忆(q 退出)===")
    while True:
        q = input("\n问题: ").strip()
        if q.lower() == "q":
            break
        context = hippo.retrieve(q)
        prompt = f"以下背景知识供参考:\n{context}\n\n用户问题:{q}"
        resp = index.as_query_engine(similarity_top_k=3).query(prompt)
        print("答:", resp)
        hippo.add_dialog(q, str(resp))

解释

二、逐模块详细讲解

  1. 模型配置
    与原来完全一致,只是将 OLLAMA_URL 等参数收归到顶部,方便一键修改。
  2. 目录与常数
    • DATA_DIR:你的本地文档目录。
    • PERSIST_DIR:LlamaIndex 的索引持久化位置。
    • MEMORY_DIR:新增,用于存放「长期事实」和「向量存储」的本地文件。
    • SHORT_TOKENS / FACT_LIMIT / VECTOR_LIMIT:三处记忆硬阈值,直接决定“遗忘”节奏。
  3. FactMemoryBlock
    用最简单的「三元组」模拟事实:
    • add():同一 (主语, 谓词) 只保留最新宾语;超限后按「使用次数少 + 时间旧」淘汰。
    • retrieve():支持模糊匹配主语,返回得分最高的 topk 条事实。
    • persist() / load():纯文本 tsv 落盘,删除即“全脑遗忘”。
  4. VectorMemoryBlock
    基于 SimpleVectorStore
    • add():任意文本 → 向量 → 入库。
    • retrieve():先向量相似检索,再对每条结果按「时间衰减」重新打分,返回 topk。
    • _trim():超限后整体重建向量库,保证只保留最近/最高分的 VECTOR_LIMIT 条。
  5. HippocampusMemory(核心)
    把「短/长」记忆统一收口:
    • add_dialog():每次对话后把整句写进短期 FIFO,并异步调用 _consolidate() 做「二次加工」→ 长期记忆。
    • add_documents():新增函数,在构建索引后立即把整篇文档内容灌进长期记忆(既做向量也抽事实),实现「阅读即记忆」。
    • retrieve():把三类记忆拼成一个字符串,直接塞给 LLM 当上下文。
    • periodic_persist():定时落盘,防止进程崩溃丢失。
  6. 索引管理
    build_index() 里多调用了一句 hippo.add_documents(documents),于是每次目录变更、重新解析文档后,新内容会自动进入长期记忆;其余逻辑与原 demo 保持一致。
  7. 后台线程 watcher
    仍负责「5 分钟扫一次目录」→ 有变化就重建索引 + 写记忆;扫完顺手 periodic_persist() 把记忆落盘。
  8. 交互循环
    • 每次提问先把 hippo.retrieve(query) 拿出来当上下文。
    • 拿到回答后再把本轮 (问题, 回答) 写回记忆,形成闭环。

三、运行效果速写

复制

===  LlamaIndex + Ollama + 海马体记忆(q 退出)===
>>> 加载已有索引 …
问题: 什么是量子计算?
答: 量子计算是利用量子叠加与纠缠特性进行并行计算的新型计算范式……
(后台异步把该问答句做向量+事实抽取,写进长期记忆)

...5 min 后你在 data/ 放了新文件...
[index] 解析文档并构建索引 …
[index] 索引+记忆已更新   ← 新文档内容已自动成为长期记忆

此时再提问涉及新文档的主题,LLM 会同时拿到:

  • 短期最近几轮对话
  • 长期事实三元组
  • 长期语义向量片段

实现「文档阅读 + 多轮对话」双层记忆的动态结合与自动遗忘。