llamaindex关于删除 即更新文件 向量库的更新

Chroma向量数据库

代码如下:

# 1. 安装依赖
# pip install llama-index llama-index-vector-stores-chroma llama-index-embeddings-ollama chromadb

import chromadb
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.ollama import OllamaEmbedding   # 关键替换

# 2. 初始化 Chroma
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("ollama_embed")

# 3. 指定 Ollama 提供的向量模型
embed_model = OllamaEmbedding(
    model_name="nomic-embed-text",   # 也可换成 mxbai-embed-large、snowflake-arctic-embed 等
    base_url="http://localhost:11434",  # Ollama 默认地址
)

# 4. 构建 vector_store & storage_context
vector_store = ChromaVectorStore(chroma_collection=collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# 5. 加载文档并一次性向量化入库
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    show_progress=True
)

# 6. 验证查询
query_engine = index.as_query_engine(similarity_top_k=3)
print(query_engine.query("你的问题"))

如果使用删除 那么需要调用 collection.delete(ids=[file_path])这个方法,但是Chroma 里的 ids 自然就是“路径+序号”,所以需要先找到文件的ids。

matched_ids = []
# 删除的ids的chunks列表默认为空 防止错误
# 1. 一次性把 collection 里所有记录拉回
all_existing = collection.get(include=["metadatas"])   # 默认只返回 ids,加 include 才有 metadata

# 2. 过滤出你要的文件路径
target_path = "/home/you/project/data/obsolete.pdf"
matched_ids = [ _id
                for _id, meta in zip(all_existing["ids"], all_existing["metadatas"])
                if meta.get("file_path") == target_path ]     # 这里字段名必须与你当初存的一致

# 3. 打印确认
print("待删除 IDs:", matched_ids)

# 4. 真正删除 (删除列表)
if matched_ids:
    collection.delete(ids=matched_ids)


#循环显示通读的代码:找到 matched_ids 
for _id, meta in zip(all_existing["ids"], all_existing["metadatas"]):
    print(f"检查 ID: {_id}, metadata: {meta}")          # 每一步都打印,看得一清二楚
    if meta.get("file_path") == target_path:
        matched_ids.append(_id)
        print(f"  --> 命中,加入待删列表")
    else:
        print(f"  --> 不匹配,跳过")

跟新的化再执行插入:

documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model=embed_model,
    show_progress=True
)
发表在 None | 留下评论

PLAN for AI UART 两Micropython设备操作

明白了:

  • 设备 A(本地)——你手里正在跑代码、带键盘/屏幕的那块板子;
  • 设备 B(远端)——只引出 UART-TX/RX 的裸机 MicroPython 板;
  • 双方都已经烧好同版本 MicroPython,波特率统一 115200;
  • 目标:在 A 上敲命令 → 把 B 的 REPL 当“远程 shell”用,并且能把 B 上的文件拉回 A 编辑,再写回去。

下面给出“双端都是 MicroPython”的最简实现,全部用 pyboard.py 核心思路——也就是 raw-REPL(Ctrl-A) 模式:

  1. 发代码时一次性把脚本贴进去;
  2. 远端在 raw-REPL 里执行完后把 stdout+stderr 统一回传;
  3. 文件内容用 uzlib + base64 流式打包,避免内存炸,也兼容二进制。

一、硬件连线

A.TX → B.RX
A.RX → B.TX
GND 共地

(如果两块 3.3 V 直接连即可;5 V 板子请串 1 kΩ 或加电平转换)


二、公用底层:raw_REPL 会话函数

把下面代码存到 A 的 /lib/uart_repl.py,后面所有功能都调它:

import time, binascii, uzlib, gc

class UartRepl:
    def __init__(self, uart, timeout=3):
        self.u = uart
        self.u.read()          # 清空缓冲
        self.timeout = timeout

    def _wait(self, want):
        t = time.ticks_ms()
        buf = b''
        while time.ticks_diff(time.ticks_ms(), t) < self.timeout*1000:
            if self.u.any():
                buf += self.u.read(1)
                if buf.endswith(want):
                    return buf
        raise OSError('uart repl timeout')

    def enter_raw_repl(self):
        self.u.write(b'\r\x03\x03')  # 两次 Ctrl-C 打断
        time.sleep_ms(200)
        self.u.write(b'\x01')        # Ctrl-A 进 raw-REPL
        self._wait(b'raw REPL; CTRL-B to exit\r\n>')

    def exit_raw_repl(self):
        self.u.write(b'\x02')        # Ctrl-B 回到友好 REPL

    def exec_(self, src):
        src = src.strip()
        self.u.write(bytes(src, 'utf-8'))
        self.u.write(b'\x04')        # Ctrl-D 执行
        self._wait(b'OK')
        # 读回输出,直到出现 '\x04' 或 '\x04\x04' 错误标记
        out = self._wait(b'\x04')
        if out.endswith(b'\x04\x04'):
            raise Exception('remote exception:\n' + out[:-2].decode())
        return out[:-1].decode()     # 去掉结尾 \x04

三、功能 1:远程“命令行”

from machine import UART
from uart_repl import UartRepl
u = UART(2, 115200)
r = UartRepl(u)
r.enter_raw_repl()

while True:
    cmd = input('B# ')       # 在 A 端敲命令
    if cmd == 'exit':
        break
    print( r.exec_(cmd) )
r.exit_raw_repl()

四、功能 2:把 B 的文件拉回 A

思路:

  1. 远端分块读取 → base64 → print;
  2. 本地收到后解码 → 写 SD/Flash;
  3. 最后远端发一行 ###EOF### 当结束标记。
def pull(remote, local, chunk=512):
    script = f'''
import ubinascii, gc
with open('{remote}','rb') as f:
    while 1:
        blk=f.read({chunk})
        if not blk:break
        print(ubinascii.b2a_base64(blk),end='')
print('###EOF###')
'''
    r.enter_raw_repl()
    data = r.exec_(script)
    r.exit_raw_repl()
    # 本地还原
    with open(local,'wb') as fout:
        for line in data.splitlines():
            if line=='###EOF###':break
            fout.write(binascii.a2b_base64(line))
    print('pull done', local)

用例:

>>> pull('main.py', '/sd/main_remote.py')

五、功能 3:把 A 的文件推到 B

推之前先读本地文件 → base64 分块 → 发脚本 → 远端解码写盘。

def push(local, remote, chunk=512):
    import ubinascii
    r.enter_raw_repl()
    with open(local,'rb') as f:
        while 1:
            blk = f.read(chunk)
            if not blk: break
            b64 = binascii.b2a_base64(blk).strip()
            r.exec_(f"""
import ubinascii, gc
with open('{remote}','ab') as fo:
    fo.write(ubinascii.a2b_base64({repr(b64)}))
""")
    # 文件开头先 truncate
    r.exec_(f"import uos; uos.remove('{remote}') if '{remote}' in uos.listdir() else 0")
    r.exec_(f"import uos; uos.rename('{remote}.tmp','{remote}')")  # 原子替换
    r.exit_raw_repl()
    print('push done', remote)

用例:

>>> push('/sd/new_boot.py', 'boot.py')

六、做成 A 上的“远程文件管理”命令行

把上面函数再包一层交互:

if __name__ == '__main__':
    import sys, machine, uart_repl
    u = machine.UART(2, 115200)
    r = uart_repl.UartRepl(u)
    cmd = sys.argv[1]
    if cmd == 'pull':
        pull(sys.argv[2], sys.argv[3])
    elif cmd == 'push':
        push(sys.argv[2], sys.argv[3])
    elif cmd == 'sh':
        shell_mode(r)   # 前面 while True 那个函数

A 的 REPL 下就能:

>>> import uart_fs
>>> uart_fs.pull('data.csv', '/sd/data.csv')
>>> uart_fs.push('/sd/data_new.csv', 'data.csv')

七、二进制/大文件注意事项

  • chunk 设 512 字节时,RAM 峰值 < 2 kB,ESP32/RP2040 都无压力;
  • 若文件几百 kB,可在远端 gc.collect() 每次写完一块;
  • 如果两端都带硬件 flow-control(RTS/CTS),可把波特率直接拉到 921600,传输 100 kB 只需几秒。

八、一句话回顾

“本地 MicroPython” 用 raw-REPL 把 “远端 MicroPython” 当脚本引擎:

  • 读文件 = 远端 print(base64(chunk)) → 本地解码;
  • 写文件 = 本地 base64(chunk) → 远端 write(decode)
    只要一条 UART,就能把 B 的整个文件系统挂载到 A 上玩。
发表在 None | 留下评论

Micropython

1 使用一个micropython设备使用UART 通过外接设备一个micropython设备 实现repl,
读取命令到变量command上 ,按下enter后将命令通过UART发送给MPY设备,UART读取返回信息,打印。
文档编辑:将文件通过UART获取到本地,然后打开,对其修改,然后上传到设备。
那么如何通过获取文件呢?因为本设备和MPY设备有一个UART通道用于命令交互,那么如何获取文件呢?

PLAN FOR AI

  1. 发代码时一次性把脚本贴进去;
  2. 远端在 raw-REPL 里执行完后把 stdout+stderr 统一回传;
  3. 文件内容用 uzlib + base64 流式打包,避免内存炸,也兼容二进制。

2 本地一块MIcropython设备,使用 os.dupterm() FBConsole来实现,其中命令行交互 无问题,Ctrl+c +x 之类 使用发送字符 ,
问题一:文件文本编辑需要 行字符串编辑 ,内存先全部读取,然后显示,一行,一行编辑,体验不好。

附加16进制控制符

十六进制十进制缩写名称十六进制十进制缩写名称
0x000NUL空字符0x1016DLE数据链路转义
0x011SOH标题开始0x1117DC1设备控制1
0x022STX正文开始0x1218DC2设备控制2
0x033ETX正文结束 Ctrl+C0x1319DC3设备控制3
0x044EOT传输结束 Ctrl+D0x1420DC4设备控制4
0x055ENQ询问0x1521NAK拒绝接收
0x066ACK收到通知0x1622SYN同步空闲
0x077BEL0x1723ETB传输块结束
0x088BS退格0x1824CAN取消 Ctrl+X
0x099HT水平制表符0x1925EM介质中断
0x0A10LF换行键0x1A26SUB替换
0x0B11VT垂直制表符0x1B27ESC换码
0x0C12FF换页键0x1C28FS文件分隔符
0x0D13CR回车键0x1D29GS组分隔符
0x0E14SO移出0x1E30RS记录分离符
0x0F15SI移入0x1F31US单元分隔符
组合键十六进制十进制组合键十六进制十进制组合键十六进制十进制
Ctrl+A0x011Ctrl+J0x0A10Ctrl+S0x1319
Ctrl+B0x022Ctrl+K0x0B11Ctrl+T0x1420
Ctrl+C0x033Ctrl+L0x0C12Ctrl+U0x1521
Ctrl+D0x044Ctrl+M0x0D13Ctrl+V0x1622
Ctrl+E0x055Ctrl+N0x0E14Ctrl+W0x1723
Ctrl+F0x066Ctrl+O0x0F15Ctrl+X0x1824
Ctrl+G0x077Ctrl+P0x1016Ctrl+Y0x1925
Ctrl+H0x088Ctrl+Q0x1117Ctrl+Z0x1A26
Ctrl+I0x099Ctrl+R0x1218Ctrl+[0x1B27
发表在 None | 留下评论

llamaindex 研究关键词 sqlite多文件插入

chunk_size = 1024
chunk_overlap = 200
默认1024长度 重叠200

文档类型推荐 chunk_sizeoverlap额外技巧
制度/规章(段落长、条款多)1 024–2 048200–400以“## 标题”为分隔符,保证条款完整
接口文档(URL/错误码表格)512–1 024100–200一级标题 + 二级标题拼一起,别把表格拦腰砍断
会议记录/周报(短片段)256–51250–100按“—-”或“## 周次”切,避免把多周拼一起
超长 RFC/设计文档2 048–4 096400先按“##”切,再对子章节二次切,做双层摘要索引

关键词

A whoosh+jieba 10万段 chunck 约2000篇

# pip install whoosh jieba llama-index-core llama-index-retrievers-whoosh

from llama_index.core import SimpleDirectoryReader
from llama_index.retrievers.whoosh import WhooshRetriever
from llama_index.core.query_engine import RetrieverQueryEngine

# 1. 读取文档
documents = SimpleDirectoryReader("data").load_data()

# 2. 建/载 whoosh 倒排
whoosh_retriever = WhooshRetriever(
    index_dir="./whoosh_idx",
    docs=documents,
    top_k=5
)

# 3. 纯关键词查询引擎
query_engine = RetrieverQueryEngine(whoosh_retriever)

# 4. 交互
if __name__ == "__main__":
    print("=== Whoosh 纯关键词查询 ===")
    while True:
        q = input("\n问题 (q退出): ").strip()
        if q.lower() == "q":
            break
        resp = query_engine.query(q)
        print("答:", resp)

混合查询 关键词 +向量

pip install llama-index-core llama-index-llms-ollama llama-index-embeddings-ollama

# 关键词方案二选一
pip install llama-index-retrievers-whoosh     # 路线① Whoosh

代码

vector_index = VectorStoreIndex.from_documents(documents)
//向量
from llama_index.retrievers.whoosh import WhooshRetriever

keyword_retriever = WhooshRetriever(
    index_dir="./whoosh_idx",
    docs=documents,
    top_k=5
)

//混合关键词 +向量
from llama_index.core.retrievers import QueryFusionRetriever

hybrid_retriever = QueryFusionRetriever(
    retrievers=[
        vector_index.as_retriever(similarity_top_k=5),
        keyword_retriever,
    ],
    retriever_weights=[0.7, 0.3],   # 向量权重高一点,可按场景调
    similarity_top_k=5,             # 最终只给 LLM 前 5 段
    num_queries=1,                  # 不对用户问题做改写
)

//查询
from llama_index.core.query_engine import RetrieverQueryEngine
//导入混合
query_engine = RetrieverQueryEngine(hybrid_retriever)

if __name__ == "__main__":
    print("=== 向量 + 关键词 联合召回 ===")
    while True:
        q = input("\n问题 (q退出): ").strip()
        if q.lower() == "q":
            break
        resp = query_engine.query(q)
        print("答:", resp)


SQLITE 补充 多次document插入 列表后向量化

使用列表来实现多文件插入,先读取多个文件到documents列表中 documents.extend(SimpleDirectoryReader(afile).load_data())之后再index = VectorStoreIndex.from_documents(documents)实现一次性的向量化

documents = []                       # 1. 先弄空列表
for _, afile, _ in bk:               # 2. 循环取路径
    if not os.path.isfile(afile):
        print(f'跳过不存在文件:{afile}')
        continue
    documents.extend(                # 3. 把每个文件拆出的文档追加进来
        SimpleDirectoryReader(afile).load_data()
    )

if not documents:                    # 4. 保险:确实读到东西了
    raise SystemExit('没有有效文档,无法构建索引!')

index = VectorStoreIndex.from_documents(documents)   # 5. 一次向量化
发表在 None | 留下评论

llama factory安装

miniconda即可

conda create -n llama_factory python=3.10
conda activate llama_factory
git clone --depth 1 https://github.com/hiyouga/LLaMA-Factory.git
cd LLaMA-Factory
pip install -e ".[torch,metrics]"

llamafactory-cli env             # 查看 PyTorch、CUDA、transformers 等版本  
llamafactory-cli version         # 只看自身版本号


 llamafactory-cli webui          # 默认 127.0.0.1:7860  
# 如果想让外网也能访问  
 llamafactory-cli webui --share
发表在 None | 留下评论