LLAMAINDEX 分租户 部门:Filter过滤+节点后处理

检索器作用:

  • 1向量模型的语义过滤(更具问题的向量和向量存储数据进行向量匹配)
  • 2 使用元数据过滤

初步作用

1 创建是加入metadata属性

2创建 过滤器

3 使用检索器(index.as_retriever())过滤器(检索器内使用 或者如下简单使用)

  • 基础元数据过滤器(MetadataFilter)
  • 精确匹配过滤器(ExactMatchFilter)
  • 嵌套的MetadataFilters实例
from llama_index.core.vector_store.types import MetadataFilter,ExactMatchFilter
documents = SimpleDirectoryReader("data").load_data()
for document in documents:
    document.metadata['department']="ABC"
#添加元数据 department为ABC 单文件也是同样返回list类型
#之后就是节点化
#过滤模块 ExactMatchFilter 和MetadataFilters
#filters过滤,下面创建了ABC部门的过滤器
filtersABC=MetadataFilters(
    filters=[ExactMatchFilter(key="department",value="ABC")]
)
实际使用最简单的查询 index对象
xxx=index.as_query_engine(filters=filtersABC)

正式查询 RetrieverQueryEngine

from llama_index.core.query_engine import RetrieverQueryEngine

下面正式简写的逻辑代码

from llama_index.core import VectorStoreIndex, get_response_synthesizer
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
# 1. 新增过滤器相关导入
from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters
# 2. 建立过滤器
filtersABC = MetadataFilters(
    filters=[ExactMatchFilter(key="department", value="ABC")]
)
# 3. 建索引(保持不变)
index = VectorStoreIndex.from_documents(documents)
# 4. 把过滤器挂到检索器,similarity_top_k相似度最高的10个结果,使用过滤器filtersABC过滤此处还用向量模型更具问题向量与向量库进行匹配并将score值附上,以此给下面的节点后处理的相似度过滤器使用
retriever = VectorIndexRetriever(
    index=index,
    similarity_top_k=10,
    filters=filtersABC          # <── 关键一句
)
# 5. 响应器的创建,下面使用默认配置
response_synthesizer = get_response_synthesizer()
# 6.创建查询器 检索器来的数据,响应器,节点后处理(相似度大于0.7)
query_engine = RetrieverQueryEngine(
    retriever=retriever,
    response_synthesizer=response_synthesizer,
    node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.7)],
)
response = query_engine.query("What did the author do growing up?")
print(response)

再加上节点关键词后处理KeywordNodePostprocessor

from llama_index.core.postprocessor import KeywordNodePostprocessor
#和相似度一个地方导入
keyword_filter = KeywordNodePostprocessor(
    required_keywords=["author", "childhood"],   # 必须同时出现
    exclude_keywords=["fiction", "novel"]        # 一旦出现就整段扔掉
)
#在其中查询加入:
node_postprocessors=[
        SimilarityPostprocessor(similarity_cutoff=0.7),  # 1. 先卡相似度
        keyword_filter,                                  # 2. 再卡关键词
    ]

前后节点PrevNextNodePostprocessor一起拉回来,实现更好的上下文

from llama_index.core.postprocessor import PrevNextNodePostprocessor
#和相似度一个地方导入
prev_next = PrevNextNodePostprocessor(
    num_prev=1,      # 往前多拿 1 个
    num_next=1,      # 往后多拿 1 个
    include_original=True,   # 把原始节点也保留
)
#在其中查询加入:
node_postprocessors=[
        SimilarityPostprocessor(similarity_cutoff=0.7),  # ① 相似度
        keyword_filter,                                  # ② 关键词
        prev_next,                                       # ③ 扩上下文
    ],

加上时间顺序

from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("data").load_data()
for doc in documents:
    doc.metadata["date"] = "2025-12-12"   # 统一日期
from llama_index.core.postprocessor import FixedRecencyPostprocessor
#和相似度一个地方导入
# 1. 按 metadata 里的 "date" 字段排序,只留最新的 5 个节点
recency = FixedRecencyPostprocessor(
    top_k=5,               # 保留最新 5 个
    date_key="date"        # 对应 metadata 中的时间字段名
)
#在其中查询加入:
 node_postprocessors=[
        SimilarityPostprocessor(similarity_cutoff=0.7),  # ① 相似度
        keyword_filter,                                  # ② 关键词
        prev_next,                                       # ③ 扩上下文
        recency,                                         # ④ 只留最新 5 个
    ],

EmbeddingRecencyPostprocessor TimeWeightedPostprocessor

参数EmbeddingRecencyPostprocessorTimeWeightedPostprocessor说明
alpha✅ 0→1 可调❌ 无时间权重;0=只看向量,1=只看新鲜度
time_decay (λ)✅ 可调✅ 可调衰减系数;越大,旧文档得分掉得越快
top_k最终按综合分排序后保留多少节点
date_key✅ 默认 “date”✅ 默认 “date”节点 metadata 里的日期字段名
  • 想“稍微”让新文档靠前,但又不想误杀经典老文档
    → EmbeddingRecencyPostprocessor(alpha 设 0.3–0.5)
  • 业务只关心“最近一年”的内容,老文档基本无用
    → TimeWeightedPostprocessor(lambda 0.99–0.995)
  • 希望平滑调节“向量 vs 时间”天平
    → 只有 EmbeddingRecencyPostprocessor 能给 alpha 旋钮。

# 方案 A:线性插值
recency = EmbeddingRecencyPostprocessor(
    alpha=0.5,          # 时间占 50 %
    time_decay=0.995,   # 旧文档衰减斜率
    top_k=5,
    date_key="date"
)
# 方案 B:乘法衰减
time_weight = TimeWeightedPostprocessor(
    time_decay=0.995,
    top_k=5,
    date_key="date"
)

重排 LLMRerank 默认,需要在前后节点召回之前PrevNextNodePostprocessor省tokens

from llama_index.core.postprocessor import (
    SimilarityPostprocessor,
    KeywordNodePostprocessor,
    PrevNextNodePostprocessor,
    FixedRecencyPostprocessor,
    LLMRerank,  # ① 新增
)
llm_rerank = LLMRerank(top_n=4, choice_batch_size=3)  
# 一次让 LLM 评 3 条 一共回4条
query_engine = RetrieverQueryEngine(
    retriever=retriever,
    response_synthesizer=get_response_synthesizer(),
    node_postprocessors=[sim, key, date, llm_rerank, prev_next], 
)
###
向量召回 10 节点
→ similarity≥0.7
→ 含关键词
→ 取最新 7 个
→ LLM 重排选 3 个
→ 前后各补 1 个相邻节点
→ 送 LLM 生成答案

去重(MD5)DeduplicatePostProcessor(show_progress=True)

在重排LLMRerank之后,扩充上下文之前PrevNextNodePostprocessor

dedup=DeduplicatePostProcessor(show_progress=True)

时间硬截断 时间不在最新的几个节点,相当于最近时间的几个节点top_k。主要针对metadata中的时间元数据 date_key。reverse=False(默认False是升序,True则为最先的,自己看着办吧)

fixedt=FixedRecencyPostprocessor(top_k=8,date_key=’create_time’,reverse=False)

llamaindex补充知识 搜索器 🟰检索器 节点处理 相应器

检索器

一开始就没有加入这个知识点 以为生成引索后直接使用 关键词 和 大模型 llm 来查询

其实还有一步检索器检索器 检索器 可以对引用的数据初步检索 可以做到分部门(权限)

节点处理器

之后节点处理 还可以对数据处理 隐私处理 节点前后处理实现长文档处理 对检索的数据评分过滤 还可以减少传入值 (若没有这一步 每次大模型检索会把全部节点向量,连同问题一同进入特别耗tokens 再者目前llm 一般128k上下文,可能获取数据不全导致回复有问题)还有重复处理 时间处理(时间规则顺序)

响应器

在接受到问题后 使用预设prompt 和llm 通信,响应后然后将问题与节点处理后的数据一一交给llm 然后得出最终结果

以上三个组成llamaindex的搜索器。

用户提问

检索器(召回相关节点)

节点处理器(过滤、脱敏、压缩、重排)

响应器(组织提示词 + LLM 生成答案)

返回结果(可带引用)

llamaindex关于删除 即更新文件 向量库的更新

Chroma向量数据库

代码如下:

# 1. 安装依赖
# pip install llama-index llama-index-vector-stores-chroma llama-index-embeddings-ollama chromadb

import chromadb
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.ollama import OllamaEmbedding   # 关键替换

# 2. 初始化 Chroma
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("ollama_embed")

# 3. 指定 Ollama 提供的向量模型
embed_model = OllamaEmbedding(
    model_name="nomic-embed-text",   # 也可换成 mxbai-embed-large、snowflake-arctic-embed 等
    base_url="http://localhost:11434",  # Ollama 默认地址
)

# 4. 构建 vector_store & storage_context
vector_store = ChromaVectorStore(chroma_collection=collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# 5. 加载文档并一次性向量化入库
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    show_progress=True
)

# 6. 验证查询
query_engine = index.as_query_engine(similarity_top_k=3)
print(query_engine.query("你的问题"))

如果使用删除 那么需要调用 collection.delete(ids=[file_path])这个方法,但是Chroma 里的 ids 自然就是“路径+序号”,所以需要先找到文件的ids。

matched_ids = []
# 删除的ids的chunks列表默认为空 防止错误
# 1. 一次性把 collection 里所有记录拉回
all_existing = collection.get(include=["metadatas"])   # 默认只返回 ids,加 include 才有 metadata

# 2. 过滤出你要的文件路径
target_path = "/home/you/project/data/obsolete.pdf"
matched_ids = [ _id
                for _id, meta in zip(all_existing["ids"], all_existing["metadatas"])
                if meta.get("file_path") == target_path ]     # 这里字段名必须与你当初存的一致

# 3. 打印确认
print("待删除 IDs:", matched_ids)

# 4. 真正删除 (删除列表)
if matched_ids:
    collection.delete(ids=matched_ids)


#循环显示通读的代码:找到 matched_ids 
for _id, meta in zip(all_existing["ids"], all_existing["metadatas"]):
    print(f"检查 ID: {_id}, metadata: {meta}")          # 每一步都打印,看得一清二楚
    if meta.get("file_path") == target_path:
        matched_ids.append(_id)
        print(f"  --> 命中,加入待删列表")
    else:
        print(f"  --> 不匹配,跳过")

跟新的化再执行插入:

documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    show_progress=True
)

PLAN for AI UART 两Micropython设备操作

明白了:

  • 设备 A(本地)——你手里正在跑代码、带键盘/屏幕的那块板子;
  • 设备 B(远端)——只引出 UART-TX/RX 的裸机 MicroPython 板;
  • 双方都已经烧好同版本 MicroPython,波特率统一 115200;
  • 目标:在 A 上敲命令 → 把 B 的 REPL 当“远程 shell”用,并且能把 B 上的文件拉回 A 编辑,再写回去。

下面给出“双端都是 MicroPython”的最简实现,全部用 pyboard.py 核心思路——也就是 raw-REPL(Ctrl-A) 模式:

  1. 发代码时一次性把脚本贴进去;
  2. 远端在 raw-REPL 里执行完后把 stdout+stderr 统一回传;
  3. 文件内容用 uzlib + base64 流式打包,避免内存炸,也兼容二进制。

一、硬件连线

A.TX → B.RX
A.RX → B.TX
GND 共地

(如果两块 3.3 V 直接连即可;5 V 板子请串 1 kΩ 或加电平转换)


二、公用底层:raw_REPL 会话函数

把下面代码存到 A 的 /lib/uart_repl.py,后面所有功能都调它:

import time, binascii, uzlib, gc

class UartRepl:
    def __init__(self, uart, timeout=3):
        self.u = uart
        self.u.read()          # 清空缓冲
        self.timeout = timeout

    def _wait(self, want):
        t = time.ticks_ms()
        buf = b''
        while time.ticks_diff(time.ticks_ms(), t) < self.timeout*1000:
            if self.u.any():
                buf += self.u.read(1)
                if buf.endswith(want):
                    return buf
        raise OSError('uart repl timeout')

    def enter_raw_repl(self):
        self.u.write(b'\r\x03\x03')  # 两次 Ctrl-C 打断
        time.sleep_ms(200)
        self.u.write(b'\x01')        # Ctrl-A 进 raw-REPL
        self._wait(b'raw REPL; CTRL-B to exit\r\n>')

    def exit_raw_repl(self):
        self.u.write(b'\x02')        # Ctrl-B 回到友好 REPL

    def exec_(self, src):
        src = src.strip()
        self.u.write(bytes(src, 'utf-8'))
        self.u.write(b'\x04')        # Ctrl-D 执行
        self._wait(b'OK')
        # 读回输出,直到出现 '\x04' 或 '\x04\x04' 错误标记
        out = self._wait(b'\x04')
        if out.endswith(b'\x04\x04'):
            raise Exception('remote exception:\n' + out[:-2].decode())
        return out[:-1].decode()     # 去掉结尾 \x04

三、功能 1:远程“命令行”

from machine import UART
from uart_repl import UartRepl
u = UART(2, 115200)
r = UartRepl(u)
r.enter_raw_repl()

while True:
    cmd = input('B# ')       # 在 A 端敲命令
    if cmd == 'exit':
        break
    print( r.exec_(cmd) )
r.exit_raw_repl()

四、功能 2:把 B 的文件拉回 A

思路:

  1. 远端分块读取 → base64 → print;
  2. 本地收到后解码 → 写 SD/Flash;
  3. 最后远端发一行 ###EOF### 当结束标记。
def pull(remote, local, chunk=512):
    script = f'''
import ubinascii, gc
with open('{remote}','rb') as f:
    while 1:
        blk=f.read({chunk})
        if not blk:break
        print(ubinascii.b2a_base64(blk),end='')
print('###EOF###')
'''
    r.enter_raw_repl()
    data = r.exec_(script)
    r.exit_raw_repl()
    # 本地还原
    with open(local,'wb') as fout:
        for line in data.splitlines():
            if line=='###EOF###':break
            fout.write(binascii.a2b_base64(line))
    print('pull done', local)

用例:

>>> pull('main.py', '/sd/main_remote.py')

五、功能 3:把 A 的文件推到 B

推之前先读本地文件 → base64 分块 → 发脚本 → 远端解码写盘。

def push(local, remote, chunk=512):
    import ubinascii
    r.enter_raw_repl()
    with open(local,'rb') as f:
        while 1:
            blk = f.read(chunk)
            if not blk: break
            b64 = binascii.b2a_base64(blk).strip()
            r.exec_(f"""
import ubinascii, gc
with open('{remote}','ab') as fo:
    fo.write(ubinascii.a2b_base64({repr(b64)}))
""")
    # 文件开头先 truncate
    r.exec_(f"import uos; uos.remove('{remote}') if '{remote}' in uos.listdir() else 0")
    r.exec_(f"import uos; uos.rename('{remote}.tmp','{remote}')")  # 原子替换
    r.exit_raw_repl()
    print('push done', remote)

用例:

>>> push('/sd/new_boot.py', 'boot.py')

六、做成 A 上的“远程文件管理”命令行

把上面函数再包一层交互:

if __name__ == '__main__':
    import sys, machine, uart_repl
    u = machine.UART(2, 115200)
    r = uart_repl.UartRepl(u)
    cmd = sys.argv[1]
    if cmd == 'pull':
        pull(sys.argv[2], sys.argv[3])
    elif cmd == 'push':
        push(sys.argv[2], sys.argv[3])
    elif cmd == 'sh':
        shell_mode(r)   # 前面 while True 那个函数

A 的 REPL 下就能:

>>> import uart_fs
>>> uart_fs.pull('data.csv', '/sd/data.csv')
>>> uart_fs.push('/sd/data_new.csv', 'data.csv')

七、二进制/大文件注意事项

  • chunk 设 512 字节时,RAM 峰值 < 2 kB,ESP32/RP2040 都无压力;
  • 若文件几百 kB,可在远端 gc.collect() 每次写完一块;
  • 如果两端都带硬件 flow-control(RTS/CTS),可把波特率直接拉到 921600,传输 100 kB 只需几秒。

八、一句话回顾

“本地 MicroPython” 用 raw-REPL 把 “远端 MicroPython” 当脚本引擎:

  • 读文件 = 远端 print(base64(chunk)) → 本地解码;
  • 写文件 = 本地 base64(chunk) → 远端 write(decode)
    只要一条 UART,就能把 B 的整个文件系统挂载到 A 上玩。

Micropython

1 使用一个micropython设备使用UART 通过外接设备一个micropython设备 实现repl,
读取命令到变量command上 ,按下enter后将命令通过UART发送给MPY设备,UART读取返回信息,打印。
文档编辑:将文件通过UART获取到本地,然后打开,对其修改,然后上传到设备。
那么如何通过获取文件呢?因为本设备和MPY设备有一个UART通道用于命令交互,那么如何获取文件呢?

PLAN FOR AI

  1. 发代码时一次性把脚本贴进去;
  2. 远端在 raw-REPL 里执行完后把 stdout+stderr 统一回传;
  3. 文件内容用 uzlib + base64 流式打包,避免内存炸,也兼容二进制。

2 本地一块MIcropython设备,使用 os.dupterm() FBConsole来实现,其中命令行交互 无问题,Ctrl+c +x 之类 使用发送字符 ,
问题一:文件文本编辑需要 行字符串编辑 ,内存先全部读取,然后显示,一行,一行编辑,体验不好。

附加16进制控制符

十六进制十进制缩写名称十六进制十进制缩写名称
0x000NUL空字符0x1016DLE数据链路转义
0x011SOH标题开始0x1117DC1设备控制1
0x022STX正文开始0x1218DC2设备控制2
0x033ETX正文结束 Ctrl+C0x1319DC3设备控制3
0x044EOT传输结束 Ctrl+D0x1420DC4设备控制4
0x055ENQ询问0x1521NAK拒绝接收
0x066ACK收到通知0x1622SYN同步空闲
0x077BEL0x1723ETB传输块结束
0x088BS退格0x1824CAN取消 Ctrl+X
0x099HT水平制表符0x1925EM介质中断
0x0A10LF换行键0x1A26SUB替换
0x0B11VT垂直制表符0x1B27ESC换码
0x0C12FF换页键0x1C28FS文件分隔符
0x0D13CR回车键0x1D29GS组分隔符
0x0E14SO移出0x1E30RS记录分离符
0x0F15SI移入0x1F31US单元分隔符
组合键十六进制十进制组合键十六进制十进制组合键十六进制十进制
Ctrl+A0x011Ctrl+J0x0A10Ctrl+S0x1319
Ctrl+B0x022Ctrl+K0x0B11Ctrl+T0x1420
Ctrl+C0x033Ctrl+L0x0C12Ctrl+U0x1521
Ctrl+D0x044Ctrl+M0x0D13Ctrl+V0x1622
Ctrl+E0x055Ctrl+N0x0E14Ctrl+W0x1723
Ctrl+F0x066Ctrl+O0x0F15Ctrl+X0x1824
Ctrl+G0x077Ctrl+P0x1016Ctrl+Y0x1925
Ctrl+H0x088Ctrl+Q0x1117Ctrl+Z0x1A26
Ctrl+I0x099Ctrl+R0x1218Ctrl+[0x1B27