#UV
python -m pip install -U pip uv -i https://mirrors.aliyun.com/pypi/simple
uv pip install -U "mineru[core]" -i https://mirrors.aliyun.com/pypi/simple
#---
pip install -U "mineru[core]" -i https://mirrors.aliyun.com/pypi/simple
# 3. 下载模型权重(国内镜像)
三种方法
export MINERU_MODEL_SOURCE=modelscope
#linux
$Env:MINERU_MODEL_SOURCE = "modelscope"
#windows powershell
set MINERU_MODEL_SOURCE=modelscope
#windows CMD
mineru-models-download # 首次运行会自动生成 ~/mineru.json 配置
#会让你选择下载源 modelscope
#选择下载类型 all
# 查看版本
mineru --version
# 示例:mineru 0.9.2
# 跑一张 CPU 单线程测试
mineru -p sample.pdf -o out_dir
#out_dir下有一个 sample.md文件 可以给LLM识别
#文本识别模型可以使用! qwen3 也可以
发表评论
Saya 0.0.2 版本
MCP server交流:通过ollama 的交流模型,让其AI判断是否要调用记忆接口,然后通过
服务一:记忆接口,使用llamaindex 记忆模块
MCP负责交流所有的交流记忆信息保存为文本。 然后自动提交到llamaindex 的data 重新向量化
LLM 的设置 在mcp中使用prompt 设置相关信息
TAG
LlamaIndex 没有叫“遗忘参数”的单一旋钮,但把“人类式遗忘”拆成了 时间衰减、重要性加权、容量淘汰、手动删除 四条主线,组合后就能逼近 Ebbinghaus 曲线。
LLAMAINDEX一个就可以完成 对话记忆+文件记忆
llamaindex 放入PDF docx xlsx
安装CNORCR
pip install llama-index-readers-file pymupdf docx2txt openpyxl pandas
pip install cnocr[ort-cpu]
pip install watchdog
CNOCR下载模型,先创建脚本 自动下载:
# 在一台能上网的机器执行一次即可
from cnocr import CnOcr
_ = CnOcr() # 首次会自动把检测+识别模型
# 下载到 ~/cnocr/2.3 目录
# 把整个 ~/cnocr 文件夹拷到离线机同名路径即可
执行
# ================= 依赖安装 =================
# pip install llama-index llama-index-llms-ollama llama-index-embeddings-ollama
# pip install cnocr watchdog fitz docx2txt pandas openpyxl
from pathlib import Path
import time, hashlib
from llama_index.core import Settings, Document, VectorStoreIndex
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from cnocr import CnOcr
import fitz, docx2txt, pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ================= 全局模型 =================
Settings.llm = Ollama(model="gemma:4b", request_timeout=600)
Settings.embed_model = OllamaEmbedding(model_name="embeddinggemma")
ocr = CnOcr(root="~/cnocr/2.3") # 预下载权重目录
# ================= 文件读取器 =================
class LocalFileReader:
def load_data(self, file_path: str):
p = Path(file_path)
if p.suffix.lower() == ".pdf":
doc = fitz.open(p)
text = ""
for page in doc:
out = ocr.ocr(page.get_pixmap().tobytes("png"))
text += "\n".join(["".join(line) for line in out])
return [Document(text=text, metadata={"file_name": p.name})]
if p.suffix.lower() in [".docx", ".doc"]:
txt = docx2txt.process(p)
return [Document(text=txt, metadata={"file_name": p.name})]
if p.suffix.lower() in [".xlsx", ".xls"]:
dfs = pd.read_excel(p, sheet_name=None)
txt = "\n".join(f"【{s}】\n{d.to_csv(index=False, sep='\t')}" for s, d in dfs.items())
return [Document(text=txt, metadata={"file_name": p.name})]
if p.suffix.lower() in [".png", ".jpg", ".jpeg"]:
out = ocr.ocr(str(p))
txt = "\n".join(["".join(line) for line in out])
return [Document(text=txt, metadata={"file_name": p.name})]
# txt / md
from llama_index.core import SimpleDirectoryReader
return SimpleDirectoryReader(input_files=[p]).load_data()
# ================= 工具:计算文件 hash =================
def file_hash(path: Path, chunk=8192):
h = hashlib.md5()
with open(path, "rb") as f:
while chunk := f.read(chunk):
h.update(chunk)
return h.hexdigest()
# ================= 目录扫描 + 索引构建 =================
DATA_DIR = Path("data")
SUPPORTED = (".pdf", ".docx", ".doc", ".xlsx", ".xls", ".png", ".jpg", ".jpeg", ".txt", ".md")
def build_index():
all_docs = []
for p in DATA_DIR.rglob("*"):
if p.is_file() and p.suffix.lower() in SUPPORTED:
all_docs += LocalFileReader().load_data(p)
return VectorStoreIndex.from_documents(all_docs)
# ================= 监视器:文件变化自动重建 =================
class DataHandler(FileSystemEventHandler):
def __init__(self, rebuild_func):
self.rebuild = rebuild_func
def on_any_event(self, event):
if event.is_directory:
return
if Path(event.src_path).suffix.lower() in SUPPORTED:
print(f"[watchdog] {event.event_type} -> {event.src_path}")
self.rebuild()
def rebuild():
global query_engine
print("[rebuild] 正在重新索引...")
new_index = build_index()
query_engine = new_index.as_query_engine()
print("[rebuild] 完成!")
# ================= 首次建库 =================
query_engine = build_index().as_query_engine()
# ================= 启动监视 =================
observer = Observer()
observer.schedule(DataHandler(rebuild), str(DATA_DIR), recursive=True)
observer.start()
# ================= 交互 / 测试 =================
if __name__ == "__main__":
try:
while True:
q = input("\n请输入问题(q 退出):").strip()
if q.lower() == "q":
break
resp = query_engine.query(q)
print(resp)
except KeyboardInterrupt:
pass
finally:
observer.stop()
observer.join()
FASTAPI 开放API
from pathlib import Path
from fastapi import FastAPI
from pydantic import BaseModel
from llama_index.core import Settings, Document, VectorStoreIndex
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from cnocr import CnOcr
import fitz, docx2txt, pandas as pd
# ---------------- 模型 ----------------
Settings.llm = Ollama(model="gemma:4b", request_timeout=600)
Settings.embed_model = OllamaEmbedding(model_name="embeddinggemma")
ocr = CnOcr(root="~/cnocr/2.3")
# ---------------- 文件读取器 ----------------
class LocalFileReader:
def load_data(self, file_path: str):
p = Path(file_path)
if p.suffix.lower() == ".pdf":
doc = fitz.open(p)
text = ""
for page in doc:
out = ocr.ocr(page.get_pixmap().tobytes("png"))
text += "\n".join(["".join(line) for line in out])
return [Document(text=text, metadata={"file_name": p.name})]
if p.suffix.lower() in [".docx", ".doc"]:
txt = docx2txt.process(p)
return [Document(text=txt, metadata={"file_name": p.name})]
if p.suffix.lower() in [".xlsx", ".xls"]:
dfs = pd.read_excel(p, sheet_name=None)
txt = "\n".join(f"【{s}】\n{d.to_csv(index=False, sep='\t')}" for s, d in dfs.items())
return [Document(text=txt, metadata={"file_name": p.name})]
if p.suffix.lower() in [".png", ".jpg", ".jpeg"]:
out = ocr.ocr(str(p))
txt = "\n".join(["".join(line) for line in out])
return [Document(text=txt, metadata={"file_name": p.name})]
from llama_index.core import SimpleDirectoryReader
return SimpleDirectoryReader(input_files=[p]).load_data()
# ---------------- 建索引(启动时一次) ----------------
DATA_DIR = Path("data")
SUPPORTED = (".pdf", ".docx", ".doc", ".xlsx", ".xls", ".png", ".jpg", ".jpeg", ".txt", ".md")
all_docs = []
for p in DATA_DIR.rglob("*"):
if p.is_file() and p.suffix.lower() in SUPPORTED:
all_docs += LocalFileReader().load_data(p)
index = VectorStoreIndex.from_documents(all_docs)
query_engine = index.as_query_engine()
# ---------------- FastAPI ----------------
app = FastAPI(title="LocalRAG")
class Q(BaseModel):
question: str
@app.post("/ask")
def ask(q: Q):
return {"answer": str(query_engine.query(q.question))}
# ---------------- 启动命令 ----------------
# uvicorn api:app --host 0.0.0.0 --port 8000
llama-index RAG 配合ollama
安装llama-index
pip install llama-index
安装ollama的对应模块
pip install llama-index-llms-ollama llama-index-embeddings-ollama
#安装模型
ollama pull embeddinggemma
ollama pull gemma:4b
#可以换模型
默认以当前的目录下的data 文件夹下文件(后期插入文件可以定时执行来刷新RAG)
创建测试代码 app.py
#!/usr/bin/env python3
# demo.py : LlamaIndex + Ollama 本地问答
import os
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
StorageContext,
load_index_from_storage,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
# 1. 全局模型配置
OLLAMA_URL = "http://127.0.0.1:11434" # 默认端口
EMBED_MODEL = "embeddinggemma" # 向量模型
LLM_MODEL = "gemma3:4b" # 生成模型
Settings.embed_model = OllamaEmbedding(
model_name=EMBED_MODEL, base_url=OLLAMA_URL
)
Settings.llm = Ollama(
model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0
)
# 2. 持久化目录(同一目录二次启动可直接加载索引,无需重新解析)
PERSIST_DIR = "./storage"
# 3. 加载或构建索引
if not os.path.exists(PERSIST_DIR):
print(">>> 首次运行,解析文档并构建索引 …")
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
index.storage_context.persist(persist_dir=PERSIST_DIR)
else:
print(">>> 发现已有索引,直接加载 …")
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
index = load_index_from_storage(storage_context)
# 4. 创建查询引擎
query_engine = index.as_query_engine(similarity_top_k=3)
# 5. 简单交互
if __name__ == "__main__":
print("=== LlamaIndex + Ollama 本地问答 ===")
while True:
q = input("\n问题 (q 退出): ").strip()
if q.lower() == "q":
break
resp = query_engine.query(q)
print("答:", resp)
定时刷新 使用watchdog
pip install watchdog
定时刷新data目录
#!/usr/bin/env python3
# demo.py : LlamaIndex + Ollama 本地问答 + 定时自动刷新索引
import os, time, threading
from pathlib import Path
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
StorageContext,
load_index_from_storage,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.llama_index.embeddings.ollama import OllamaEmbedding
# -------------------- 1. 全局模型配置 --------------------
OLLAMA_URL = "http://127.0.0.1:11434"
EMBED_MODEL = "nomic-embed-text" # 可自行替换
LLM_MODEL = "llama3.1:8b" # 可自行替换
Settings.embed_model = OllamaEmbedding(model_name=EMBED_MODEL, base_url=OLLAMA_URL)
Settings.llm = Ollama(model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0)
# -------------------- 2. 参数 --------------------
DATA_DIR = Path("data")
PERSIST_DIR = Path("./storage")
RESCAN_INTERVAL = 5 * 60 # 秒,5 分钟扫一次
# -------------------- 3. 索引管理 --------------------
def build_index():
"""重新解析文档并持久化索引"""
print("[index] 正在重新构建索引 …")
documents = SimpleDirectoryReader(str(DATA_DIR)).load_data()
index = VectorStoreIndex.from_documents(documents)
index.storage_context.persist(persist_dir=str(PERSIST_DIR))
print("[index] 索引已更新完成")
return index
def load_or_build():
"""启动时:有缓存就加载,没有就新建"""
if PERSIST_DIR.exists():
print(">>> 发现已有索引,直接加载 …")
storage_context = StorageContext.from_defaults(persist_dir=str(PERSIST_DIR))
return load_index_from_storage(storage_context)
else:
print(">>> 首次运行,解析文档并构建索引 …")
return build_index()
# -------------------- 4. 定时刷新线程 --------------------
def md5_dir():
"""简易指纹:拼接所有文件「路径+修改时间」"""
return "|".join(
f"{p.relative_to(DATA_DIR)}:{p.stat().st_mtime}"
for p in sorted(DATA_DIR.rglob("*"))
if p.is_file()
)
def watcher():
"""后台线程:循环扫描目录,有变化就重建"""
last_md5 = md5_dir()
while True:
time.sleep(RESCAN_INTERVAL)
new_md5 = md5_dir()
if new_md5 != last_md5:
last_md5 = new_md5
global index, query_engine
index = build_index()
query_engine = index.as_query_engine(similarity_top_k=3)
else:
print("[index] 目录无变化,跳过刷新")
# -------------------- 5. 启动 --------------------
index = load_or_build()
query_engine = index.as_query_engine(similarity_top_k=3)
threading.Thread(target=watcher, daemon=True).start()
# -------------------- 6. 交互 --------------------
if __name__ == "__main__":
print("=== LlamaIndex + Ollama 本地问答 (定时刷新已开启) ===")
while True:
q = input("\n问题 (q 退出): ").strip()
if q.lower() == "q":
break
resp = query_engine.query(q)
print("答:", resp)
模拟人类记忆 海马体
其中 data存放外部数据,对话内容产生记忆。
- 短期记忆
就是最近 N 轮问答文本本身(ChatMemoryBuffer里的 FIFO 队列)。
用户每输入一个问题、模型每给出一次回答,都会被原封不动地压进队列;当总 token 数超过SHORT_MEMORY_TOKENS时,最老的整轮对话会被整体弹出——这就是即时遗忘。 - 长期记忆
由后台线程从“刚刚发生”的问答文本里二次加工产生:- 事实记忆:用非常简陋的「A is B」正则把句子拆成 (主语, 谓词, 宾语) 三元组,再写进
FactMemoryBlock;容量满或长期未用即被淘汰。 - 语义向量:把整句/片段直接做成向量入库,后续用相似度召回;分数会随时间衰减,低于阈值或总量超限即被删除。
- 事实记忆:用非常简陋的「A is B」正则把句子拆成 (主语, 谓词, 宾语) 三元组,再写进
也就是说,没有对话就没有任何记忆可存;文档(data/目录)只提供“外部知识”,不会被自动写回记忆系统。
如果你想让模型把阅读到的文档内容也记到长期记忆里,只需要在 build_index() 之后把 documents 遍历一遍,手动调用:
for doc in documents:
hippo.vector.add(doc.text) # 语义记忆
# 或抽实体后 hippo.fact.add(…) # 事实记忆
#!/usr/bin/env python3
# demo_memory.py : LlamaIndex + Ollama + 海马体式「记忆–遗忘」
import os
import time
import threading
from collections import deque
from datetime import datetime
from pathlib import Path
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
StorageContext,
load_index_from_storage,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores import SimpleVectorStore
# -------------------- 1. 全局模型 --------------------
OLLAMA_URL = "http://127.0.0.1:11434"
EMBED_MODEL = "nomic-embed-text"
LLM_MODEL = "llama3.1:8b"
Settings.embed_model = OllamaEmbedding(model_name=EMBED_MODEL, base_url=OLLAMA_URL)
Settings.llm = Ollama(model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0)
# -------------------- 2. 参数 --------------------
DATA_DIR = Path("data")
PERSIST_DIR = Path("./storage")
MEMORY_DIR = PERSIST_DIR / "memory" # 长期记忆持久化
RESCAN_INTERVAL = 5 * 60
SHORT_MEMORY_TOKENS = 2000 # 短期记忆 token 上限
FACT_LIMIT = 200 # 长期事实上限
VECTOR_LIMIT = 1000 # 语义向量上限
DECAY_HALF_DAY = 0.9 # 每 12h 衰减系数
# -------------------- 3. 轻量级 MemoryBlock --------------------
class FactMemoryBlock:
"""结构化事实:【subject】【predicate】【object】+ 时间戳 + 使用次数"""
def __init__(self, limit=FACT_LIMIT):
self.limit = limit
self.facts = [] # [(sub, pred, obj, ts, count), ...]
self.load()
def add(self, sub, pred, obj):
now = datetime.utcnow()
# 简单合并:同一 (sub,pred) 只保留最新 obj
self.facts = [(s, p, o, ts, c) for (s, p, o, ts, c) in self.facts if not (s == sub and p == pred)]
self.facts.append((sub, pred, obj, now, 1))
if len(self.facts) > self.limit:
# 淘汰最久未用(count 最小且最旧)
self.facts.sort(key=lambda x: (x[4], x[3]))
self.facts = self.facts[-self.limit:]
def retrieve(self, query_sub, topk=5):
"""朴素字符串匹配 + 使用次数加权"""
hits = []
for sub, pred, obj, ts, c in self.facts:
score = c
if query_sub.lower() in sub.lower():
score += 5
hits.append((score, sub, pred, obj))
hits.sort(reverse=True)
return hits[:topk]
def persist(self):
MEMORY_DIR.mkdir(exist_ok=True)
with open(MEMORY_DIR / "facts.txt", "w", encoding="utf8") as f:
for sub, pred, obj, ts, c in self.facts:
f.write(f"{sub}\t{pred}\t{obj}\t{ts.isoformat()}\t{c}\n")
def load(self):
if (MEMORY_DIR / "facts.txt").exists():
with open(MEMORY_DIR / "facts.txt", encoding="utf8") as f:
for line in f:
sub, pred, obj, ts_str, c = line.strip().split("\t")
self.facts.append((sub, pred, obj, datetime.fromisoformat(ts_str), int(c)))
class VectorMemoryBlock:
"""语义向量:用 SimpleVectorStore 存节点,附加时间戳 & 分数"""
def __init__(self, limit=VECTOR_LIMIT, decay=DECAY_HALF_DAY):
self.limit = limit
self.decay = decay
self.store = SimpleVectorStore()
self.nodes = [] # 本地缓存 (node, ts, score)
if (MEMORY_DIR / "vector_store.json").exists():
self.store = SimpleVectorStore.from_persist_path(str(MEMORY_DIR / "vector_store.json"))
# 简版:只加载最近 limit 条
self._trim()
def add(self, text):
now = datetime.utcnow()
node = TextNode(text=text)
self.nodes.append((node, now, 1.0))
self.store.add([node])
self._trim()
def retrieve(self, query_str, top_k=3):
"""向量检索 + 时间衰减分数重排"""
emb = Settings.embed_model.get_text_embedding(query_str)
hits = self.store.query(embedding=emb, similarity_top_k=top_k * 2)
res = []
for n, ts, score in self.nodes:
hours = (datetime.utcnow() - ts).total_seconds() / 3600
decayed = score * (self.decay ** (hours / 12))
for hit in hits:
if hit.node.node_id == n.node_id:
res.append((decayed * hit.score, n.text))
break
res.sort(reverse=True)
return [txt for _, txt in res[:top_k]]
def _trim(self):
if len(self.nodes) > self.limit:
# 按分数+时间排序淘汰
self.nodes.sort(key=lambda x: (x[2], x[1]))
drop = self.nodes[:len(self.nodes) - self.limit]
self.nodes = self.nodes[-self.limit:]
# 同步从 store 删除(简单重建)
self.store = SimpleVectorStore()
self.store.add([n for n, _, _ in self.nodes])
def persist(self):
MEMORY_DIR.mkdir(exist_ok=True)
self.store.persist(str(MEMORY_DIR / "vector_store.json"))
# -------------------- 4. 记忆管理器 --------------------
class HippocampusMemory:
def __init__(self):
self.short = ChatMemoryBuffer(token_limit=SHORT_MEMORY_TOKENS)
self.fact = FactMemoryBlock()
self.vector = VectorMemoryBlock()
def add_interaction(self, user, assistant):
# 短期记忆
self.short.put(f"User: {user}\nAssistant: {assistant}")
# 异步长期巩固(简单抽实体)
threading.Thread(target self._consolidate, args=(user + " " + assistant,), daemon=True).start()
def _consolidate(self, text):
"""简易实体→事实抽取 & 语义片段入库"""
# 这里用 LLM 抽实体/摘要,为演示直接按逗号切
for sent in text.split("."):
sent = sent.strip()
if len(sent) < 10:
continue
# 向量记忆
self.vector.add(sent)
# 事实记忆:简单「A 是 B」模式
if " is " in sent:
sub, obj = sent.split(" is ", 1)
self.fact.add(sub.strip(), "is", obj.strip())
def retrieve_context(self, query):
# 短期
short_hist = self.short.get()
# 长期
facts = self.fact.retrieve(query)
vectors = self.vector.retrieve(query)
long_part = "\n".join([f"{s} {p} {o}" for _, s, p, o in facts]) + "\n" + "\n".join(vectors)
return f"短期记忆:\n{short_hist}\n\n长期记忆:\n{long_part}"
def periodic_forget(self):
"""定时遗忘:长期记忆自己维护容量 & 衰减"""
self.fact.persist()
self.vector.persist()
hippo = HippocampusMemory()
# -------------------- 5. 索引管理(同你原来) --------------------
def build_index():
print("[index] 正在重新构建索引 …")
documents = SimpleDirectoryReader(str(DATA_DIR)).load_data()
index = VectorStoreIndex.from_documents(documents)
index.storage_context.persist(persist_dir=str(PERSIST_DIR))
return index
def load_or_build():
if PERSIST_DIR.exists():
storage_context = StorageContext.from_defaults(persist_dir=str(PERSIST_DIR))
return load_index_from_storage(storage_context)
else:
return build_index()
# -------------------- 6. 后台线程 --------------------
def md5_dir():
return "|".join(f"{p.relative_to(DATA_DIR)}:{p.stat().st_mtime}" for p in sorted(DATA_DIR.rglob("*")) if p.is_file())
def watcher():
last_md5 = md5_dir()
while True:
time.sleep(RESCAN_INTERVAL)
new_md5 = md5_dir()
if new_md5 != last_md5:
last_md5 = new_md5
global index
index = build_index()
hippo.periodic_forget()
# -------------------- 7. 启动 --------------------
index = load_or_build()
threading.Thread(target=watcher, daemon=True).start()
# -------------------- 8. 交互 --------------------
if __name__ == "__main__":
print("=== LlamaIndex + Ollama + 海马体记忆 (q 退出) ===")
while True:
q = input("\n问题: ").strip()
if q.lower() == "q":
break
# 把记忆作为上下文拼到 prompt
context = hippo.retrieve_context(q)
prompt = f"以下背景知识供参考:\n{context}\n\n用户问题:{q}"
resp = index.as_query_engine(similarity_top_k=3).query(prompt)
print("答:", resp)
hippo.add_interaction(q, str(resp))
结合文档变成长期记忆
#!/usr/bin/env python3
# hippo_bot.py : LlamaIndex + Ollama + 海马体记忆(对话产生 + 文档产生)
import os
import time
import threading
from collections import deque
from datetime import datetime
from pathlib import Path
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
StorageContext,
load_index_from_storage,
Document,
)
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.schema import TextNode
from llama_index.core.vector_stores import SimpleVectorStore
# -------------------- 1. 模型配置 --------------------
OLLAMA_URL = "http://127.0.0.1:11434"
EMBED_MODEL = "nomic-embed-text"
LLM_MODEL = "llama3.1:8b"
Settings.embed_model = OllamaEmbedding(model_name=EMBED_MODEL, base_url=OLLAMA_URL)
Settings.llm = Ollama(model=LLM_MODEL, base_url=OLLAMA_URL, request_timeout=360.0)
# -------------------- 2. 目录/常数 --------------------
DATA_DIR = Path("data")
PERSIST_DIR = Path("./storage")
MEMORY_DIR = PERSIST_DIR / "memory"
RESCAN_INTERVAL = 5 * 60 # 5 分钟扫一次
SHORT_TOKENS = 2000 # 短期记忆 token 上限
FACT_LIMIT = 200 # 事实上限
VECTOR_LIMIT = 1000 # 语义片上限
DECAY = 0.9 # 每 12h 衰减系数
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
# -------------------- 3. 长期记忆块(同上一版) ----------
class FactMemoryBlock:
def __init__(self, limit=FACT_LIMIT):
self.limit = limit
self.facts = [] # [(sub, pred, obj, ts, count), ...]
self.load()
def add(self, sub, pred, obj):
now = datetime.utcnow()
self.facts = [(s, p, o, ts, c) for (s, p, o, ts, c) in self.facts if not (s == sub and p == pred)]
self.facts.append((sub.strip(), pred.strip(), obj.strip(), now, 1))
if len(self.facts) > self.limit:
self.facts.sort(key=lambda x: (x[4], x[3]))
self.facts = self.facts[-self.limit:]
def retrieve(self, query_sub, topk=5):
hits = []
for sub, pred, obj, ts, c in self.facts:
score = c
if query_sub.lower() in sub.lower():
score += 5
hits.append((score, sub, pred, obj))
hits.sort(reverse=True)
return hits[:topk]
def persist(self):
with open(MEMORY_DIR / "facts.txt", "w", encoding="utf8") as f:
for sub, pred, obj, ts, c in self.facts:
f.write(f"{sub}\t{pred}\t{obj}\t{ts.isoformat()}\t{c}\n")
def load(self):
p = MEMORY_DIR / "facts.txt"
if p.exists():
with p.open(encoding="utf8") as f:
for line in f:
sub, pred, obj, ts_str, c = line.strip().split("\t")
self.facts.append((sub, pred, obj, datetime.fromisoformat(ts_str), int(c)))
class VectorMemoryBlock:
def __init__(self, limit=VECTOR_LIMIT, decay=DECAY):
self.limit = limit
self.decay = decay
self.store = SimpleVectorStore()
self.nodes = [] # [(node, ts, score), ...]
if (MEMORY_DIR / "vector_store.json").exists():
self.store = SimpleVectorStore.from_persist_path(str(MEMORY_DIR / "vector_store.json"))
self._trim()
def add(self, text):
now = datetime.utcnow()
node = TextNode(text=text)
self.nodes.append((node, now, 1.0))
self.store.add([node])
self._trim()
def retrieve(self, query_str, top_k=3):
emb = Settings.embed_model.get_text_embedding(query_str)
hits = self.store.query(embedding=emb, similarity_top_k=top_k * 2)
res = []
for n, ts, score in self.nodes:
hours = (datetime.utcnow() - ts).total_seconds() / 3600
decayed = score * (self.decay ** (hours / 12))
for hit in hits:
if hit.node.node_id == n.node_id:
res.append((decayed * hit.score, n.text))
break
res.sort(reverse=True)
return [txt for _, txt in res[:top_k]]
def _trim(self):
if len(self.nodes) > self.limit:
self.nodes.sort(key=lambda x: (x[2], x[1]))
drop = self.nodes[:len(self.nodes) - self.limit]
self.nodes = self.nodes[-self.limit:]
self.store = SimpleVectorStore()
self.store.add([n for n, _, _ in self.nodes])
def persist(self):
self.store.persist(str(MEMORY_DIR / "vector_store.json"))
# -------------------- 4. 海马体记忆管理器 --------------------
class HippocampusMemory:
def __init__(self):
self.short = ChatMemoryBuffer(token_limit=SHORT_TOKENS)
self.fact = FactMemoryBlock()
self.vector = VectorMemoryBlock()
# 1. 对话产生的记忆
def add_dialog(self, user: str, assistant: str):
self.short.put(f"User: {user}\nAssistant: {assistant}")
# 异步巩固
threading.Thread(target=self._consolidate, args=(user + " " + assistant,), daemon=True).start()
# 2. 文档产生的记忆
def add_documents(self, docs: list[Document]):
for doc in docs:
self.vector.add(doc.text)
# 简单抽「A is B」
for sent in doc.text.split("."):
if " is " in sent:
sub, obj = sent.split(" is ", 1)
self.fact.add(sub.strip(), "is", obj.strip())
def _consolidate(self, text: str):
for sent in text.split("."):
sent = sent.strip()
if len(sent) < 10:
continue
self.vector.add(sent)
if " is " in sent:
sub, obj = sent.split(" is ", 1)
self.fact.add(sub.strip(), "is", obj.strip())
def retrieve(self, query: str) -> str:
short = self.short.get()
facts = self.fact.retrieve(query)
vectors = self.vector.retrieve(query)
long = "\n".join([f"{s} {p} {o}" for _, s, p, o in facts]) + "\n" + "\n".join(vectors)
return f"短期记忆:\n{short}\n\n长期记忆:\n{long}"
def periodic_persist(self):
self.fact.persist()
self.vector.persist()
hippo = HippocampusMemory()
# -------------------- 5. 索引管理 --------------------
def build_index():
print("[index] 解析文档并构建索引 …")
documents = SimpleDirectoryReader(str(DATA_DIR)).load_data()
index = VectorStoreIndex.from_documents(documents)
index.storage_context.persist(persist_dir=str(PERSIST_DIR))
# ❗新增:把文档内容也写进长期记忆
hippo.add_documents(documents)
print("[index] 索引+记忆已更新")
return index
def load_or_build():
if PERSIST_DIR.exists():
print(">>> 加载已有索引 …")
storage_context = StorageContext.from_defaults(persist_dir=str(PERSIST_DIR))
return load_index_from_storage(storage_context)
else:
return build_index()
# -------------------- 6. 后台定时线程 --------------------
def md5_dir():
return "|".join(f"{p.relative_to(DATA_DIR)}:{p.stat().st_mtime}" for p in sorted(DATA_DIR.rglob("*")) if p.is_file())
def watcher():
last_md5 = md5_dir()
while True:
time.sleep(RESCAN_INTERVAL)
new_md5 = md5_dir()
if new_md5 != last_md5:
last_md5 = new_md5
global index
index = build_index()
else:
print("[index] 无变化,跳过刷新")
hippo.periodic_persist()
index = load_or_build()
threading.Thread(target=watcher, daemon=True).start()
# -------------------- 7. 交互 --------------------------
if __name__ == "__main__":
print("=== LlamaIndex + Ollama + 海马体记忆(q 退出)===")
while True:
q = input("\n问题: ").strip()
if q.lower() == "q":
break
context = hippo.retrieve(q)
prompt = f"以下背景知识供参考:\n{context}\n\n用户问题:{q}"
resp = index.as_query_engine(similarity_top_k=3).query(prompt)
print("答:", resp)
hippo.add_dialog(q, str(resp))
解释
二、逐模块详细讲解
- 模型配置
与原来完全一致,只是将OLLAMA_URL等参数收归到顶部,方便一键修改。 - 目录与常数
DATA_DIR:你的本地文档目录。PERSIST_DIR:LlamaIndex 的索引持久化位置。MEMORY_DIR:新增,用于存放「长期事实」和「向量存储」的本地文件。SHORT_TOKENS / FACT_LIMIT / VECTOR_LIMIT:三处记忆硬阈值,直接决定“遗忘”节奏。
- FactMemoryBlock
用最简单的「三元组」模拟事实:add():同一 (主语, 谓词) 只保留最新宾语;超限后按「使用次数少 + 时间旧」淘汰。retrieve():支持模糊匹配主语,返回得分最高的 topk 条事实。persist() / load():纯文本 tsv 落盘,删除即“全脑遗忘”。
- VectorMemoryBlock
基于SimpleVectorStore:add():任意文本 → 向量 → 入库。retrieve():先向量相似检索,再对每条结果按「时间衰减」重新打分,返回 topk。_trim():超限后整体重建向量库,保证只保留最近/最高分的VECTOR_LIMIT条。
- HippocampusMemory(核心)
把「短/长」记忆统一收口:add_dialog():每次对话后把整句写进短期 FIFO,并异步调用_consolidate()做「二次加工」→ 长期记忆。add_documents():新增函数,在构建索引后立即把整篇文档内容灌进长期记忆(既做向量也抽事实),实现「阅读即记忆」。retrieve():把三类记忆拼成一个字符串,直接塞给 LLM 当上下文。periodic_persist():定时落盘,防止进程崩溃丢失。
- 索引管理
build_index()里多调用了一句hippo.add_documents(documents),于是每次目录变更、重新解析文档后,新内容会自动进入长期记忆;其余逻辑与原 demo 保持一致。 - 后台线程 watcher
仍负责「5 分钟扫一次目录」→ 有变化就重建索引 + 写记忆;扫完顺手periodic_persist()把记忆落盘。 - 交互循环
- 每次提问先把
hippo.retrieve(query)拿出来当上下文。 - 拿到回答后再把本轮
(问题, 回答)写回记忆,形成闭环。
- 每次提问先把
三、运行效果速写
复制
=== LlamaIndex + Ollama + 海马体记忆(q 退出)===
>>> 加载已有索引 …
问题: 什么是量子计算?
答: 量子计算是利用量子叠加与纠缠特性进行并行计算的新型计算范式……
(后台异步把该问答句做向量+事实抽取,写进长期记忆)
...5 min 后你在 data/ 放了新文件...
[index] 解析文档并构建索引 …
[index] 索引+记忆已更新 ← 新文档内容已自动成为长期记忆
此时再提问涉及新文档的主题,LLM 会同时拿到:
- 短期最近几轮对话
- 长期事实三元组
- 长期语义向量片段
实现「文档阅读 + 多轮对话」双层记忆的动态结合与自动遗忘。
声音模拟TAG
VoxCPM 0.5B 可以多种方言 中文 英文
输入音频即可模拟发声
https://www.modelscope.cn/models/OpenBMB/VoxCPM-0.5B
# 1) 直接合成(单段文本)
voxcpm --text "Hello VoxCPM" --output out.wav
# 2) 声音克隆(参考音频 + 对应文本)
voxcpm --text "Hello" \
--prompt-audio path/to/voice.wav \
--prompt-text "reference transcript" \
--output out.wav \
--denoise
# 3) 批量处理(每行一段文本)
voxcpm --input examples/input.txt --output-dir outs
#(可选)批量 + 克隆
voxcpm --input examples/input.txt --output-dir outs \
--prompt-audio path/to/voice.wav \
--prompt-text "reference transcript" \
--denoise
# 4) 推理参数(质量/速度)
voxcpm --text "..." --output out.wav \
--cfg-value 2.0 --inference-timesteps 10 --normalize
# 5) 模型加载
# 优先使用本地路径
voxcpm --text "..." --output out.wav --model-path /path/to/VoxCPM_model_dir
# 或从 Hugging Face 自动下载/缓存
voxcpm --text "..." --output out.wav \
--hf-model-id openbmb/VoxCPM-0.5B --cache-dir ~/.cache/huggingface --local-files-only
# 6) 降噪器控制
voxcpm --text "..." --output out.wav \
--no-denoiser --zipenhancer-path iic/speech_zipenhancer_ans_multiloss_16k_base
# 7) 查看帮助
voxcpm --help
python -m voxcpm.cli --help
CMD中 使用 python -m voxcpm.cli 替换前面的voxcpm
克隆声音的范例
python -m voxcpm.cli --text "你好你在干什么啊" --prompt-audio a.wav --prompt-text "你好 现在是几点钟了 明天又是什么时候呢 大家 都在上班还是上学" --output out.wav --denoise
web测试页面
下载下来后,进入文件目录 执行python app.py即可运行
http://localhost:7860
