Compare commits

...

6 Commits

17 changed files with 886 additions and 271 deletions

4
.env
View File

@@ -6,7 +6,9 @@ DJANGO_ALLOWED_HOSTS=*
LLM_PROVIDER=openai_compatible LLM_PROVIDER=openai_compatible
LLM_API_KEY=sk-pgvkjondmmrlyxmrfhotgpuirgbtgzrpjpweorhwruflxmxw LLM_API_KEY=sk-pgvkjondmmrlyxmrfhotgpuirgbtgzrpjpweorhwruflxmxw
LLM_BASE_URL=https://api.siliconflow.cn/v1 LLM_BASE_URL=https://api.siliconflow.cn/v1
LLM_MODEL=Qwen/Qwen2.5-7B-Instruct LLM_MODEL=deepseek-ai/DeepSeek-V4-Pro
SILICONFLOW_EMBEDDING_MODEL=Qwen/Qwen3-Embedding-8B
SILICONFLOW_EMBEDDING_DIMENSIONS=4096
# SiliconFlow embedding model for RAG # SiliconFlow embedding model for RAG
EMBEDDING_API_KEY=sk-pgvkjondmmrlyxmrfhotgpuirgbtgzrpjpweorhwruflxmxw EMBEDDING_API_KEY=sk-pgvkjondmmrlyxmrfhotgpuirgbtgzrpjpweorhwruflxmxw

View File

@@ -119,7 +119,7 @@ REGULATORY_LLM_REVIEW_MAX_ATTEMPTS = int(os.environ.get("REGULATORY_LLM_REVIEW_M
REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS = float(os.environ.get("REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS", "0.5")) REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS = float(os.environ.get("REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS", "0.5"))
REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS = float(os.environ.get("REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS", "15")) REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS = float(os.environ.get("REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS", "15"))
SILICONFLOW_BASE_URL = os.environ.get("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1") SILICONFLOW_BASE_URL = os.environ.get("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1")
SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "") SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", LLM_API_KEY)
SILICONFLOW_EMBEDDING_MODEL = os.environ.get( SILICONFLOW_EMBEDDING_MODEL = os.environ.get(
"SILICONFLOW_EMBEDDING_MODEL", "SILICONFLOW_EMBEDDING_MODEL",
"Qwen/Qwen3-Embedding-4B", "Qwen/Qwen3-Embedding-4B",

View File

@@ -1,115 +1,175 @@
# 架构搭建思路汇报稿(基于 Demo 版) # 架构搭建思路汇报稿(基于 Demo 版)
## 一、汇报开场 ## 一、设计路径:先锁规格,再实现代码
各位老师好,我本次 Demo 搭建的是一个面向体外诊断试剂注册资料准备与审核的智能体原型。 各位老师好,我本次 Demo 搭建的是一个面向体外诊断试剂注册资料准备与审核的智能体原型。
个 Demo 的目标不是简单做文件上传、文件解析或问答,而是把注册资料审核中几个高频、耗时、容易出错的环节串成一个可追溯的智能工作流,包括文件目录汇总、法规完整性核查、产品关键信息提取、申报表自动填充,以及异常风险预警 次开发没有直接从代码开始而是采用“文档先行、规格锁定、再实现代码”的路径。原因是注册资料审核不是一个简单问答场景它涉及文件解析、法规规则、RAG 依据、工作流状态、导出文件、人工确认和整改闭环。如果一开始就写代码,很容易出现功能能跑但边界不清、结果不可追溯、后续难维护的问题
从整体定位上看,它更像是一个“注册资料审核助手”:用户上传一批申报资料后,系统能够先把资料包结构化,再对照法规规则做核查,之后输出风险清单和整改建议,并把抽取到的产品信息继续复用到申报模板填表中。 所以整体设计路径分为四步:
## 二、Demo 运行结果展示
本次 Demo 目前可以展示四类核心运行结果。
### 1. 文件目录汇总表
用户上传注册资料文件夹、散装文件或压缩包后,系统会自动完成附件固化、压缩包解压、文件扫描和页数统计。
最终系统会生成 Markdown 汇总报告和 Excel 文件明细表,主要字段包括:
| 字段 | 说明 |
| --- | --- |
| 序号 | 文件在批次中的顺序 |
| 目录层级 | 文件所在的相对目录 |
| 文件名 | 原始文件名 |
| 类型 | PDF、Word、Excel、PPT 等文件类型 |
| 页数 | PDF 页数、Word 页数、PPT 幻灯片数或 Excel 工作表数 |
| 路径 | 文件在批次工作目录中的相对路径 |
| 状态 | success、failed、unsupported、uncertain 等 |
| 重试次数 | 页数统计失败时的重试记录 |
| 异常说明 | 不支持、不可确定或解析失败的原因 |
这个结果解决的是资料包进入系统后的第一步问题:先把杂乱的文件夹变成结构化的文件清单。
### 2. 法规完整性报告
在文件汇总结果基础上,系统会调用法规核查工作流,对照 NMPA 体外诊断试剂注册申报资料要求进行完整性检查。
Demo 中使用 `review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.yaml` 作为结构化规则文件。规则文件中配置了附件 4 的资料要求,例如监管信息、综述资料、非临床资料、临床评价资料、说明书和标签样稿、质量管理体系文件等。
系统会检查是否缺少关键资料,例如:
| 检查对象 | 风险示例 |
| --- | --- |
| 注册申请表 | 缺失时生成阻断项或高风险 |
| 符合性声明 | 缺失时生成阻断项 |
| 产品技术要求 | 缺失时生成阻断项 |
| 注册检验报告 | 缺失时生成阻断项 |
| 产品说明书 | 缺失或章节不完整时生成高风险 |
| 标签样稿 | 缺失时生成高风险 |
| 临床评价资料 | 按适用条件生成条件性风险 |
| 质量管理体系文件 | 缺失时生成高风险 |
最终输出包括 Markdown 法规核查报告、Excel 问题清单和 JSON 结构化结果包。
### 3. 信息提取对照表
系统会从说明书、产品技术要求、注册检验报告、申请表等文件中抽取产品关键信息。
当前 Demo 中重点抽取的字段包括:
| 字段 | 用途 |
| --- | --- |
| 产品名称 | 用于一致性核查和申报表填充 |
| 型号规格 | 用于跨文件比对 |
| 预期用途 | 用于法规适用条件和模板填充 |
| 管理类别 | 用于法规判断 |
| 分类编码 | 用于注册资料核对 |
| 注册类型 | 用于模板选择和法规规则裁剪 |
| 临床评价路径 | 用于临床资料适用性判断 |
每个抽取结果都会保留来源文件、来源角色、证据片段、抽取方式和置信度。这样后续生成的填表内容不是黑盒结果,而是能够回溯到原始文件。
### 4. 异常预警列表
系统会把完整性缺失、章节异常、字段冲突、文本抽取失败、页数不可确定、通知失败等问题统一沉淀为风险项。
风险等级目前分为:
| 风险等级 | 含义 |
| --- | --- |
| 阻断项 | 影响注册资料完整性或关键合规判断,需要优先整改 |
| 高风险 | 可能影响审评,需要重点关注 |
| 中风险 | 建议整改或补充说明 |
| 低风险 | 轻微问题或格式提示 |
| 提示项 | 不直接影响结论,但建议人工确认 |
例如,如果系统发现不同文件中的“产品名称”或“型号规格”不一致,会生成一致性风险;如果缺少注册检验报告,会生成阻断项,并给出补充注册检验报告的整改建议。
## 三、智能体整体工作流
结合当前 Demo 的实现,智能体整体工作流可以概括为:
```text ```text
文件扫描 需求拆解
-> 目录汇总 -> 生成需求分析、功能设计、详细设计、数据库设计和开发计划
-> 法规匹配 -> 用文档锁定实现规格
-> 信息提取 -> 按规格实现 Django 代码、工作流、前端页面和测试
-> 一致性核查
-> 风险预警
-> 报告导出
-> 通知与整改复核
``` ```
从代码实现上看,系统拆成三条主链路 当前仓库中可以看到完整的规格文档链路
| 阶段 | 产物 | 作用 |
| --- | --- | --- |
| 需求分析 | `docs/1.需求分析` | 明确业务目标、用户动作、输入输出和异常场景 |
| 功能设计 | `docs/2.功能设计` | 把需求拆成文件汇总、法规核查、自动填表、飞书通知等模块 |
| 详细设计 | `docs/3.详细设计` | 锁定工作流节点、字段结构、状态流转和服务边界 |
| 数据库设计 | `docs/4.数据库设计` | 锁定批次、附件、节点、风险项、导出文件等模型 |
| 开发计划 | `docs/5.开发计划` | 将实现拆成可验证的开发任务和前端线框图 |
因此,这个 Demo 的核心不是“让大模型临时回答一个问题”,而是先用文档定义清楚系统应该如何工作,再把这些规格落实到代码、数据库、前端和测试中。最终形成的是一个可追溯、可复核、可继续扩展的审核工作台。
## 二、系统定位和 Demo 目标
这个 Demo 的目标不是简单做文件上传、文件解析或法规问答,而是把注册资料审核中几个高频、耗时、容易出错的环节串成一个智能工作流,包括:
```text
资料上传
-> 文件目录和页数汇总
-> NMPA 法规完整性核查
-> 法规依据 RAG 检索
-> 产品关键信息抽取
-> 一致性核查和风险预警
-> 申报文件自动填表
-> 报告导出和整改复核
```
从产品形态上看,它更像是一个“注册资料审核工作台”。用户上传一批申报资料后,系统先把资料包结构化,再按法规规则做核查,然后输出风险清单、整改建议、证据来源和导出文件。后续还可以继续复用抽取到的产品信息,自动填入申报模板。
## 三、技术栈和总体架构
本 Demo 采用轻量、可本地运行、便于测试和可解释的技术栈。
| 层级 | 技术/工具 | 作用 |
| --- | --- | --- |
| Web 框架 | Django | 路由、视图、模板、认证、ORM 和后台能力 |
| 数据库 | SQLite / Django ORM | Demo 阶段保存会话、附件、批次、节点、风险项和导出文件 |
| 前端 | Django Template + 原生 JS + CSS | 实现首页工作台、审核智能体、知识库管理、附件管理和流式对话 |
| 文件解析 | `pypdf``python-docx``python-pptx``openpyxl``xlrd``py7zr``zipfile` | 解析 PDF、Word、PPT、Excel、压缩包和旧 Office 文件 |
| 规则配置 | YAML | 维护 NMPA 体外诊断试剂注册资料核查规则 |
| RAG | ChromaDB + embedding provider | 构建法规材料向量索引,检索法规依据片段 |
| LLM | SiliconFlow / 可配置大模型接口 | 做意图路由、低置信度抽取、自然语言总结和辅助复核 |
| 流式交互 | SSE | 将工作流启动、节点进度和模型回复实时推给前端 |
| 自动化验证 | pytest + Django test client | 验证路由、页面、模型、工作流和导出结果 |
整体架构可以概括为:
```text
用户界面
-> Django 视图层
-> 对话服务和 Skill 路由器
-> 文件汇总 / 法规核查 / 自动填表工作流
-> ORM 状态记录和导出文件
-> RAG/LLM/规则服务
-> 前端工作流卡片和报告下载
```
这里的关键设计原则是规则判断要稳定RAG 负责补证据LLM 做辅助,不把高风险合规结论完全交给大模型自由发挥。
## 四、对话流程:先识别意图,再决定 RAG 或工作流
审核智能体页面不是单纯把用户输入直接发给大模型,而是有一层对话编排流程。
一次用户消息进入系统后,大致会经历以下步骤:
```text
用户输入
-> 保存用户消息
-> Skill Router 判断意图
-> 根据意图选择普通问答、附件读取或工作流
-> 必要时先检查附件和前置批次
-> 启动对应工作流或执行 RAG 问答
-> 保存助手回复和工作流事件
-> 前端通过 SSE 展示增量内容和节点状态
```
当前路由动作包括:
| action | 场景 | 后续动作 |
| --- | --- | --- |
| `normal_chat` | 普通法规问答或项目问答 | 先检索知识库,再把 RAG 片段放入大模型上下文 |
| `attachment_reader` | 用户要求阅读、提取、总结上传附件 | 调用附件读取 Skill返回文件内容摘要 |
| `file_summary` | 用户要求汇总文件目录、页数、清单 | 启动文件汇总工作流 |
| `regulatory_review` | 用户要求法规核查、完整性核查、风险预警、整改建议 | 必要时先生成文件汇总批次,再启动法规核查工作流 |
| `application_form_fill` | 用户要求申报文件填表、模板填充、安全和性能清单 | 必要时先生成文件汇总批次,再启动自动填表工作流 |
也就是说,普通问题是“先 RAG再回答”工作流问题是“先路由再检查前置条件再启动工作流”。例如用户问“注册检验报告要求是什么”系统会走 RAG 问答;用户说“请对当前资料做法规核查”,系统会进入法规核查工作流。
## 五、Skill 调用方式:路由器统一调度工具能力
Demo 中的 Skill 不是一个单独页面,而是对话服务后面的工具调用机制。用户不需要手动选择复杂功能,系统会根据用户话语和当前附件状态判断是否调用某个 Skill 或工作流。
当前实现中,`review_agent/skill_router.py` 负责意图路由。它采用两层判断:
```text
确定性规则预判
-> LLM 路由判断
-> 规则兜底
```
第一层是确定性规则。例如用户输入中包含“法规核查”“NMPA 核查”“风险预警”“自动填表”“申报模板”等明确关键词,系统可以直接判断要启动对应工作流。这样可以避免每次都依赖大模型判断。
第二层是 LLM 路由。系统会把用户消息和当前 active 附件列表发给路由模型,让模型只输出结构化 JSON
```json
{
"action": "regulatory_review",
"confidence": 0.9,
"reason": "用户要求对当前注册资料进行法规完整性核查"
}
```
第三层是规则兜底。如果 LLM 不可用、配置缺失或返回异常,系统会退回关键词和附件状态判断,保证 Demo 在本地环境也能稳定运行。
这个设计的好处是:用户体验上像是在和一个智能体对话,技术实现上则是由路由器把对话分发到不同工具、不同工作流和不同数据服务。
## 六、RAG 方式:法规依据和用户知识库共同参与
RAG 在 Demo 中有两类来源:
| 来源 | 说明 |
| --- | --- |
| 内置法规材料 | 来自 `docs/0.原始材料` 和 NMPA 相关法规文件,用于法规依据检索 |
| 用户管理知识库 | 由用户在“知识库管理”页面上传,可作为当前账号所有对话的补充知识 |
法规材料会被切分为文本块,写入 ChromaDB 向量库。每个 chunk 保留来源文件、chunk 编号、文本片段和元数据。embedding 支持真实语义 embedding也支持 deterministic/local embedding后者主要用于测试和 dry run。
RAG 在系统中的定位有两种:
### 1. 普通问答中的 RAG
如果用户提出普通问题,系统会先检索知识库,把命中的法规片段或用户知识库片段拼入上下文,再调用大模型回答。这样回答不会只依赖模型记忆,而是带有本地法规材料和用户资料依据。
```text
用户问题
-> 知识库检索
-> 过滤和排序相关片段
-> 组装为知识上下文
-> 调用 LLM 生成回答
```
### 2. 工作流中的 RAG
在法规核查工作流里RAG 不直接决定是否合规而是为规则判断补充法规依据。例如结构化规则已经判断“缺少注册检验报告”RAG 再检索相关法规要求,给出来源文件和依据片段。
这种方式避免了“让大模型自由判断合规”的不稳定性,同时让报告具备可解释依据。
## 七、三条核心工作流
当前 Demo 拆成三条主链路:文件汇总、法规核查、自动填表。
### 1. 文件汇总链路 ### 1. 文件汇总链路
对应模块:`review_agent/file_summary` 对应模块:`review_agent/file_summary`
主要流程为:
```text ```text
文件上传 文件上传
-> 附件固化 -> 附件固化
@@ -117,17 +177,17 @@ Demo 中使用 `review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.ya
-> 文件扫描 -> 文件扫描
-> 页数统计 -> 页数统计
-> 产品名识别 -> 产品名识别
-> 报告输出 -> Markdown/Excel 报告输出
``` ```
这个链路的核心作用是把原始资料包转换成结构化数据。系统会生成 `FileSummaryBatch``FileSummaryItem`,后续法规核查和自动填表都复用这套文件清单,不再重复扫描文件 这个链路负责把原始资料包转换成结构化文件清单。系统会生成 `FileSummaryBatch``FileSummaryItem`,后续法规核查和自动填表都复用这套文件清单,不再重复扫描资料
输出字段包括序号、目录层级、文件名、文件类型、页数、相对路径、统计状态、重试次数和异常说明。
### 2. 法规核查链路 ### 2. 法规核查链路
对应模块:`review_agent/regulatory_review` 对应模块:`review_agent/regulatory_review`
主要流程为:
```text ```text
准备资料 准备资料
-> 适用条件确认 -> 适用条件确认
@@ -136,20 +196,20 @@ Demo 中使用 `review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.ya
-> 文本抽取 -> 文本抽取
-> 章节核查 -> 章节核查
-> 一致性核查 -> 一致性核查
-> RAG 法规依据补充
-> 风险评估 -> 风险评估
-> 报告输出 -> 报告输出
-> 整改复核
``` ```
这条链路的核心设计原则是规则优先RAG 补依据LLM 做辅助 这条链路使用 `review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.yaml` 作为结构化规则文件。规则中配置了附件 4 的资料要求,包括监管信息、综述资料、非临床资料、临床评价资料、说明书和标签样稿、质量管理体系文件等
也就是说法规结论不直接交给大模型自由判断而是优先由结构化规则文件决定RAG 负责检索法规依据和原文片段LLM 主要用于低置信度字段抽取、自然语言条件解析和结果复核 系统会检查是否缺少关键资料,例如注册申请表、符合性声明、产品技术要求、注册检验报告、说明书、标签样稿、临床评价资料和质量管理体系文件。缺失项会转成 `RegulatoryIssue`,并按阻断项、高风险、中风险、低风险和提示项分级
### 3. 自动填表链路 ### 3. 自动填表链路
对应模块:`review_agent/application_form_fill` 对应模块:`review_agent/application_form_fill`
主要流程为:
```text ```text
准备资料 准备资料
-> 模板选择 -> 模板选择
@@ -161,173 +221,91 @@ Demo 中使用 `review_agent/regulatory_review/rules/nmpa_ivd_registration_v1.ya
-> 结果通知 -> 结果通知
``` ```
这条链路会复用前面抽取到的产品信息,自动选择申报模板,并将字段填入 Word 模板。对于冲突字段Demo 中采用“说明书优先”的策略,同时在结果中保留冲突摘要和来源追溯。 这条链路会复用前面抽取到的产品信息,自动选择申报模板,并将字段填入 Word 模板。对于冲突字段Demo 中采用明确的归并策略,同时在结果中保留冲突摘要和来源追溯。
## 四、Demo 实际调用的关键工具和库 ## 八、页面和数据工作台
本 Demo 在工具选型上以轻量、可本地运行、可解释、便于测试为原则。 前端目前包括四个主要页面:
### 1. 文件解析类工具 | 页面 | URL | 作用 |
| 工具/库 | Demo 中的用途 | 选用理由 |
| --- | --- | --- | | --- | --- | --- |
| `pypdf` | PDF 页数统计和文本抽取 | 轻量、安装简单,适合 Demo 阶段快速处理 PDF | | 首页工作台 | `/` | 展示对话、附件、知识库、批次状态和最近处理记录 |
| `python-docx` | DOCX 文本读取、Word 模板填充 | 可读取段落和表格,也能写入 Word 模板 | | 审核智能体 | `/chat/` | 对话、上传附件、启动工作流、查看节点进度 |
| `python-pptx` | PPTX 幻灯片数量统计和文本读取 | 适合统计幻灯片数量和抽取文本 | | 知识库管理 | `/knowledge-base/` | 管理用户上传知识库、查看内置法规材料和索引状态 |
| `openpyxl` | XLSX 工作表统计、Excel 报告导出 | 同时支持读取和生成 Excel | | 附件管理 | `/attachments/` | 管理不同对话下的上传附件、版本、启用状态和下载 |
| `xlrd` | 旧版 XLS 文件读取 | 补充对历史 Excel 格式的支持 |
| `olefile` | 判断老 Office 文件 OLE 结构 | 用于 doc、xls、ppt 等老格式的兜底识别 |
| `py7zr` | 7z 压缩包解压 | 支持常见资料包压缩格式 |
| Python `zipfile` | ZIP 压缩包解压 | 标准库能力,无额外依赖 |
Demo 中没有选择重型 OCR 或复杂版式引擎,是因为当前阶段重点是打通审核链路和规则闭环。对于扫描件、图片 PDF、复杂版式 PDF后续可以再接入 OCR 和更强的版式解析能力。 首页工作台重点不是营销展示,而是运行态数据,包括:
### 2. 规则和正则
系统使用 YAML 维护法规规则,例如 `nmpa_ivd_registration_v1.yaml`。每条规则包含规则编码、附件 4 编码、标题、资料类型、风险等级、匹配关键词、整改建议和 RAG 检索查询词。
正则表达式用于抽取结构化字段,例如:
```text ```text
产品名称xxx 对话总数
型号规格xxx 附件总数
预期用途xxx 知识库材料数
管理类别xxx 执行中批次
分类编码xxx 已处理批次
成功批次
等待确认批次
失败批次
最近处理记录
``` ```
选用规则和正则的原因是:这类注册资料中有大量固定标题和固定字段,使用确定性规则可以提高可解释性,也便于定位问题来源 知识库材料中同时统计用户管理文档和内置法规材料,避免把“知识库”误解成只包含用户上传文件
### 3. RAG 和向量检索 ## 九、过程留痕和可追溯设计
Demo 使用 ChromaDB 构建本地法规 RAG 索引。法规原文材料会被切分为文本块并保存来源文件、chunk 编号等元数据 审核类系统不能只输出一个结论,还必须说明结论从哪里来。因此 Demo 对关键过程都做了结构化留痕
向量 embedding 支持两种模式:
| 模式 | 用途 |
| --- | --- |
| SiliconFlow embedding | 用于真实语义检索 |
| deterministic/local embedding | 用于测试和 dry run |
RAG 在系统中的定位不是直接判断合规而是为风险问题补充法规依据。例如完整性规则已经判断“缺少注册检验报告”RAG 再检索相关法规条款,输出来源文件和依据片段,增强报告的可解释性。
### 4. LLM 调用
LLM 在 Demo 中主要承担辅助角色,包括:
| 场景 | LLM 作用 |
| --- | --- |
| 自然语言适用条件解析 | 将用户输入转换为结构化字段 |
| 低置信度字段抽取 | 正则抽取不足时补充结构化 JSON |
| 工作流结果复核 | 对中间结果做总结和校验 |
| 整改建议润色 | 在规则模板基础上优化表达 |
风险等级、法规结论和完整性判断不直接交给 LLM 决定,而是由规则引擎和风险评估服务控制。
### 5. 工作流和状态管理
系统使用 Django ORM 保存批次、节点、事件和导出文件。
关键模型包括:
| 模型 | 作用 |
| --- | --- |
| `FileSummaryBatch` | 文件汇总批次 |
| `FileSummaryItem` | 文件明细 |
| `RegulatoryReviewBatch` | 法规核查批次 |
| `RegulatoryIssue` | 法规问题和风险项 |
| `RegulatoryArtifact` | 法规核查过程产物 |
| `ApplicationFormFillBatch` | 自动填表批次 |
| `WorkflowNodeRun` | 工作流节点状态 |
| `WorkflowEvent` | SSE 事件和进度记录 |
| `ExportedSummaryFile` | Markdown、Excel、JSON、Word 等导出文件 |
前端通过 SSE 事件实时展示工作流卡片状态,使用户能够看到每个节点是否正在执行、是否成功、是否等待确认或失败。
## 五、难点规则处理方式
### 1. 文件完整性检测
文件完整性检测的难点在于:注册资料不是固定文件名,企业可能用不同命名方式组织材料。
Demo 的处理方式是使用多层匹配:
```text
规则要求项
-> 文件名关键词匹配
-> 相对路径匹配
-> 目录层级匹配
-> 必要时结合首页文本和字段候选
```
例如规则中要求“注册检验报告”,系统不仅查找文件名中是否包含“注册检验报告”,也会查找路径和目录中是否包含“检验报告”“检测报告”等别名。
如果没有匹配到文件,系统会生成 `Finding`,再由风险评估服务转换为 `RegulatoryIssue`。这样完整性问题既能被结构化记录,也能进入最终风险报告。
### 2. 信息一致性核查
一致性核查的难点在于:同一个字段可能散落在说明书、注册检验报告、产品技术要求、申请表等多个文件中。
Demo 的处理方式是:
```text
文本抽取
-> 字段正则识别
-> 同字段归并
-> 不同取值比对
-> 生成一致性风险
```
例如系统会从多个文件中抽取“产品名称”“型号规格”“预期用途”等字段。如果同一字段出现多个不同值,系统会生成高风险问题,并在证据中记录每个取值对应的来源文件。
这类结果可以直接辅助人工审核人员定位冲突来源。
### 3. 法规条款匹配
法规条款匹配的难点在于:法规原文长、条款多,直接让大模型判断容易不稳定,纯规则又缺少解释能力。
Demo 采用“双层法规能力”:
| 层级 | 职责 |
| --- | --- |
| 结构化规则库 | 负责判断应有哪些文件、哪些章节、哪些字段,以及风险等级 |
| RAG 法规依据索引 | 负责检索法规原文片段,补充依据说明 |
这种设计的好处是:判断逻辑稳定,报告解释充分,后续规则也可以由法规人员维护。
### 4. 过程留痕和可追溯
审核类系统不能只输出一个结论,还必须说明结论从哪里来。
Demo 中对关键过程都做了留痕:
| 过程 | 留痕内容 | | 过程 | 留痕内容 |
| --- | --- | | --- | --- |
| 文件汇总 | 文件路径、页数、统计状态、异常说明 | | 对话 | 用户消息、助手消息、会话标题、更新时间 |
| 文本抽取 | 文本 hash、首页文本、章节候选、字段候选 | | 附件 | 原始文件名、版本号、启用状态、存储路径、文件大小 |
| 完整性核查 | 规则编码、匹配关键词、命中文件或缺失证据 | | 文件汇总 | 批次号、文件明细、页数、统计状态、异常说明 |
| 一致性核查 | 字段值、来源文件、冲突取值 | | 工作流节点 | 节点编码、节点名称、进度、状态、错误信息 |
| RAG 检索 | 法规来源、片段文本、检索分数 | | 法规核查 | 规则编码、缺失项、风险等级、证据、整改建议 |
| 报告导出 | Markdown、Excel、JSON 结果包 | | RAG 检索 | 来源文件、片段文本、相似度、chunk 元数据 |
| 自动填表 | 字段来源、冲突摘要、追溯清单 | | 自动填表 | 字段来源、冲突摘要、模板选择、追溯清单 |
| 导出文件 | Markdown、Excel、JSON、Word 等结果文件 |
这保证了 Demo 输出的结果不是一次性回答,而是可以复核、下载、整改和继续追踪的过程资产。 这保证了 Demo 输出的结果不是一次性回答,而是可以复核、下载、整改和继续追踪的过程资产。
## 六、总结 ## 十、Demo 可展示结果
本次 Demo 可以展示以下核心结果:
### 1. 文件目录汇总表
用户上传注册资料文件夹、散装文件或压缩包后,系统自动完成附件固化、解压、扫描和页数统计,最终生成 Markdown 汇总报告和 Excel 明细表。
### 2. 法规完整性报告
系统基于文件汇总结果和 NMPA 规则库做完整性核查,输出 Markdown 法规核查报告、Excel 问题清单和 JSON 结构化结果包。
### 3. 产品关键信息提取对照表
系统从说明书、产品技术要求、注册检验报告、申请表等文件中抽取产品名称、型号规格、预期用途、管理类别、分类编码、注册类型和临床评价路径,并保留来源文件和证据片段。
### 4. 风险预警列表
系统把完整性缺失、章节异常、字段冲突、文本抽取失败、页数不可确定、通知失败等问题统一沉淀为风险项,并按阻断项、高风险、中风险、低风险和提示项分级。
### 5. 申报文件自动填表结果
系统根据资料内容和适用条件选择模板,自动填充 Word 文件,并导出字段追溯清单,说明每个字段来自哪个文件、哪个证据片段。
## 十一、总结
整体来看,本 Demo 的架构搭建思路可以概括为: 整体来看,本 Demo 的架构搭建思路可以概括为:
```text ```text
结构化资料 用文档锁定规格
匹配法规 用规则结构化审核逻辑
抽取字段 用 RAG 补充法规依据
核查一致性 用 Skill Router 调度工具和工作流
输出风险和报告 用 ORM 和导出文件沉淀过程资产
最后支持填表和整改闭环 最后通过工作台页面呈现状态和结果
``` ```
它体现的是一个“资料输入、规则判断、证据追溯、风险输出、整改闭环”的智能体原型。 它体现的是一个“资料输入、规则判断、证据追溯、风险输出、整改闭环”的智能体原型。
当前 Demo 已经完成了文件汇总、法规完整性核查、信息抽取、风险预警、报告导出和自动填表主链路。后续如果继续增强,可以重点补充 OCR、扫描件识别、复杂 PDF 版式解析、规则后台维护、人工确认界面、飞书真实消息闭环,以及更完整的多智能体编排能力。 当前 Demo 已经完成了首页工作台、审核智能体对话、附件管理、知识库管理、文件汇总、法规核查、RAG 依据检索、风险预警、报告导出和自动填表主链路。后续如果继续增强,可以重点补充 OCR、扫描件识别、复杂 PDF 版式解析、规则后台维护、人工确认界面、飞书真实消息闭环,以及更完整的多智能体编排能力。
最终希望这个智能体能够从一个 Demo 原型,逐步演进为注册资料准备和审核过程中的智能协作平台。 最终希望这个智能体能够从一个 Demo 原型,逐步演进为注册资料准备和审核过程中的智能协作平台。

View File

@@ -1,4 +1,5 @@
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.db import transaction
from django.db.models import Count, Q from django.db.models import Count, Q
import json import json
import logging import logging
@@ -7,7 +8,14 @@ from pathlib import Path
from django.http import FileResponse, Http404, JsonResponse from django.http import FileResponse, Http404, JsonResponse
from django.views.decorators.http import require_http_methods from django.views.decorators.http import require_http_methods
from review_agent.models import ApplicationFormFillBatch, Conversation, ExportedSummaryFile, FileAttachment, Message from review_agent.models import (
ApplicationFormFillBatch,
Conversation,
ExportedSummaryFile,
FileAttachment,
Message,
RegulatoryReviewBatch,
)
from review_agent.models import FileSummaryBatch, WorkflowEvent from review_agent.models import FileSummaryBatch, WorkflowEvent
from review_agent.notifications.presenter import serialize_notification_records from review_agent.notifications.presenter import serialize_notification_records
from .events import serialize_event from .events import serialize_event
@@ -152,6 +160,9 @@ def conversation_list(request):
@login_required @login_required
def conversation_detail(request, conversation_id: int): def conversation_detail(request, conversation_id: int):
conversation = _conversation_for_user(request.user, conversation_id) conversation = _conversation_for_user(request.user, conversation_id)
with transaction.atomic():
ApplicationFormFillBatch.objects.filter(conversation=conversation).delete()
RegulatoryReviewBatch.objects.filter(conversation=conversation).delete()
conversation.delete() conversation.delete()
return JsonResponse({"ok": True, "conversation_id": conversation_id}) return JsonResponse({"ok": True, "conversation_id": conversation_id})

View File

@@ -10,8 +10,8 @@ from django.core.files.uploadedfile import UploadedFile
from review_agent.models import KnowledgeBaseDocument from review_agent.models import KnowledgeBaseDocument
from review_agent.regulatory_review.services.rag_citation import RagIndexUnavailable, retrieve_citations from review_agent.regulatory_review.services.rag_citation import RagIndexUnavailable, retrieve_citations
from review_agent.regulatory_review.services.rag_embedding import DeterministicEmbeddingProvider from review_agent.regulatory_review.services.rag_embedding import get_embedding_provider
from review_agent.regulatory_review.services.rag_index import chunk_text, extract_text_from_path from review_agent.regulatory_review.services.rag_index import chunk_text, extract_text_from_path, is_excluded_source_path
from review_agent.regulatory_review.services.rule_loader import DEFAULT_RULE_PATH, compute_file_sha256, load_rule_file from review_agent.regulatory_review.services.rule_loader import DEFAULT_RULE_PATH, compute_file_sha256, load_rule_file
@@ -78,6 +78,8 @@ def list_source_documents(source_dir: Path) -> list[dict[str, Any]]:
continue continue
suffix = path.suffix.lower() suffix = path.suffix.lower()
relative_path = str(path.relative_to(source_dir)) relative_path = str(path.relative_to(source_dir))
if is_excluded_source_path(relative_path):
continue
indexed_chunk_count = source_chunk_counts.get(relative_path, 0) indexed_chunk_count = source_chunk_counts.get(relative_path, 0)
documents.append( documents.append(
{ {
@@ -101,7 +103,7 @@ def search_knowledge_base(query: str, *, n_results: int = 3) -> dict[str, Any]:
try: try:
results = retrieve_citations( results = retrieve_citations(
normalized, normalized,
embedding_provider=DeterministicEmbeddingProvider(), embedding_provider=get_embedding_provider(),
n_results=n_results, n_results=n_results,
) )
except RagIndexUnavailable as exc: except RagIndexUnavailable as exc:
@@ -151,6 +153,7 @@ def create_document_from_upload(
def update_document(document: KnowledgeBaseDocument, payload: dict[str, Any]) -> KnowledgeBaseDocument: def update_document(document: KnowledgeBaseDocument, payload: dict[str, Any]) -> KnowledgeBaseDocument:
update_fields = [] update_fields = []
active_changed = False
if "display_name" in payload: if "display_name" in payload:
document.display_name = str(payload.get("display_name") or "").strip() or document.original_name document.display_name = str(payload.get("display_name") or "").strip() or document.original_name
update_fields.append("display_name") update_fields.append("display_name")
@@ -158,12 +161,21 @@ def update_document(document: KnowledgeBaseDocument, payload: dict[str, Any]) ->
document.description = str(payload.get("description") or "").strip() document.description = str(payload.get("description") or "").strip()
update_fields.append("description") update_fields.append("description")
if "is_active" in payload: if "is_active" in payload:
document.is_active = bool(payload.get("is_active")) next_is_active = bool(payload.get("is_active"))
document.status = KnowledgeBaseDocument.Status.ACTIVE if document.is_active else KnowledgeBaseDocument.Status.DISABLED active_changed = document.is_active != next_is_active
document.is_active = next_is_active
document.status = KnowledgeBaseDocument.Status.ACTIVE if next_is_active else KnowledgeBaseDocument.Status.DISABLED
update_fields.extend(["is_active", "status"]) update_fields.extend(["is_active", "status"])
if not next_is_active:
remove_managed_document_from_index(document)
document.indexed_chunk_count = 0
document.metadata = {**(document.metadata or {}), "index_status": "disabled", "index_error": ""}
update_fields.extend(["indexed_chunk_count", "metadata"])
if update_fields: if update_fields:
update_fields.append("updated_at") update_fields.append("updated_at")
document.save(update_fields=update_fields) document.save(update_fields=update_fields)
if active_changed and document.is_active:
index_managed_document(document)
return document return document
@@ -196,6 +208,12 @@ def serialize_document(document: KnowledgeBaseDocument) -> dict[str, Any]:
def index_managed_document(document: KnowledgeBaseDocument) -> int: def index_managed_document(document: KnowledgeBaseDocument) -> int:
if document.status != KnowledgeBaseDocument.Status.ACTIVE or not document.is_active:
remove_managed_document_from_index(document)
document.indexed_chunk_count = 0
document.metadata = {**(document.metadata or {}), "index_status": "disabled", "index_error": ""}
document.save(update_fields=["indexed_chunk_count", "metadata", "updated_at"])
return 0
path = Path(document.storage_path) path = Path(document.storage_path)
if not path.is_absolute(): if not path.is_absolute():
path = Path(settings.MEDIA_ROOT) / document.storage_path path = Path(settings.MEDIA_ROOT) / document.storage_path
@@ -210,7 +228,7 @@ def index_managed_document(document: KnowledgeBaseDocument) -> int:
return 0 return 0
collection = _load_chroma_collection() collection = _load_chroma_collection()
texts = [chunk.text for chunk in chunks] texts = [chunk.text for chunk in chunks]
embeddings = DeterministicEmbeddingProvider()(texts) embeddings = get_embedding_provider()(texts)
ids = [ ids = [
hashlib.sha256(f"managed:{document.pk}:{chunk.metadata['chunk_index']}".encode("utf-8")).hexdigest() hashlib.sha256(f"managed:{document.pk}:{chunk.metadata['chunk_index']}".encode("utf-8")).hexdigest()
for chunk in chunks for chunk in chunks

View File

@@ -23,7 +23,7 @@ class Command(BaseCommand):
raise CommandError(f"法规材料目录不存在:{source_dir}") raise CommandError(f"法规材料目录不存在:{source_dir}")
try: try:
provider = get_embedding_provider(options["provider"]) provider = get_embedding_provider(options["provider"])
count = build_chroma_index(source_dir=source_dir, embedding_provider=provider) count = build_chroma_index(source_dir=source_dir, embedding_provider=provider, reset=True)
except Exception as exc: except Exception as exc:
raise CommandError(str(exc)) from exc raise CommandError(str(exc)) from exc
self.stdout.write( self.stdout.write(

View File

@@ -23,6 +23,8 @@ from .rag_embedding import EmbeddingFunction
logger = logging.getLogger("review_agent.regulatory_review.rag_index") logger = logging.getLogger("review_agent.regulatory_review.rag_index")
EXCLUDED_SOURCE_KEYWORDS = ("模拟题二", "试剂盒临床注册文件准备与审核Agent")
@dataclass(frozen=True) @dataclass(frozen=True)
class TextChunk: class TextChunk:
@@ -227,6 +229,8 @@ def collect_source_chunks(source_dir: Path) -> list[TextChunk]:
for path in sorted(source_dir.rglob("*")): for path in sorted(source_dir.rglob("*")):
if not path.is_file(): if not path.is_file():
continue continue
if is_excluded_source_path(path.relative_to(source_dir)):
continue
try: try:
text = extract_text_from_path(path) text = extract_text_from_path(path)
except RuntimeError as exc: except RuntimeError as exc:
@@ -238,6 +242,11 @@ def collect_source_chunks(source_dir: Path) -> list[TextChunk]:
return chunks return chunks
def is_excluded_source_path(path: Path | str) -> bool:
normalized = str(path)
return any(keyword in normalized for keyword in EXCLUDED_SOURCE_KEYWORDS)
def _is_attachment4(path: Path) -> bool: def _is_attachment4(path: Path) -> bool:
normalized = path.name.replace(" ", "") normalized = path.name.replace(" ", "")
return "附件4" in normalized and "体外诊断试剂注册申报资料要求及说明" in normalized return "附件4" in normalized and "体外诊断试剂注册申报资料要求及说明" in normalized
@@ -249,6 +258,7 @@ def build_chroma_index(
embedding_provider: EmbeddingFunction, embedding_provider: EmbeddingFunction,
persist_path: Path | None = None, persist_path: Path | None = None,
collection_name: str | None = None, collection_name: str | None = None,
reset: bool = False,
) -> int: ) -> int:
try: try:
import chromadb import chromadb
@@ -259,7 +269,22 @@ def build_chroma_index(
collection_name = collection_name or settings.REGULATORY_RAG_COLLECTION collection_name = collection_name or settings.REGULATORY_RAG_COLLECTION
persist_path.mkdir(parents=True, exist_ok=True) persist_path.mkdir(parents=True, exist_ok=True)
chunks = collect_source_chunks(source_dir) chunks = collect_source_chunks(source_dir)
try:
client = chromadb.PersistentClient(path=str(persist_path)) client = chromadb.PersistentClient(path=str(persist_path))
except Exception:
if not reset:
raise
clear_chroma_system_cache()
clear_chroma_index_dir(persist_path)
persist_path.mkdir(parents=True, exist_ok=True)
client = chromadb.PersistentClient(path=str(persist_path))
if reset:
try:
client.delete_collection(collection_name)
clear_chroma_system_cache()
client = chromadb.PersistentClient(path=str(persist_path))
except Exception:
pass
collection = client.get_or_create_collection(collection_name) collection = client.get_or_create_collection(collection_name)
if not chunks: if not chunks:
return 0 return 0
@@ -276,3 +301,22 @@ def build_chroma_index(
embeddings=embeddings, embeddings=embeddings,
) )
return len(chunks) return len(chunks)
def clear_chroma_index_dir(persist_path: Path | str | None = None) -> None:
chroma_path = Path(persist_path or settings.REGULATORY_RAG_CHROMA_PATH).resolve()
media_root = Path(settings.MEDIA_ROOT).resolve()
try:
chroma_path.relative_to(media_root)
except ValueError as exc:
raise RuntimeError("法规 RAG 索引目录必须位于 MEDIA_ROOT 内。") from exc
if chroma_path.exists():
shutil.rmtree(chroma_path)
def clear_chroma_system_cache() -> None:
try:
from chromadb.api.shared_system_client import SharedSystemClient
except Exception:
return
SharedSystemClient.clear_system_cache()

View File

@@ -108,6 +108,9 @@ def send_message(conversation: Conversation, content: str) -> tuple[Message, Mes
user_message = append_user_message(conversation, content) user_message = append_user_message(conversation, content)
knowledge_context = build_knowledge_context(content) knowledge_context = build_knowledge_context(content)
if should_refuse_ungrounded_chat(conversation, content, knowledge_context):
reply_content = out_of_scope_reply()
else:
try: try:
reply_content = generate_reply(conversation, content, knowledge_context=knowledge_context) reply_content = generate_reply(conversation, content, knowledge_context=knowledge_context)
except (LLMConfigurationError, LLMRequestError) as exc: except (LLMConfigurationError, LLMRequestError) as exc:
@@ -127,6 +130,31 @@ def stream_message(conversation: Conversation, content: str):
user_message = append_user_message(conversation, content) user_message = append_user_message(conversation, content)
assistant_parts: list[str] = [] assistant_parts: list[str] = []
knowledge_context = build_knowledge_context(content)
if should_refuse_ungrounded_chat(conversation, content, knowledge_context):
reply_content = out_of_scope_reply()
assistant_message = append_assistant_message(conversation, reply_content)
yield sse_event(
"meta",
{
"conversation_id": conversation.pk,
"title": conversation.title or build_conversation_title(content),
"user_message_id": user_message.pk,
"user_message": user_message.content,
},
)
yield sse_event("chunk", {"delta": reply_content})
yield sse_event(
"done",
{
"assistant_message_id": assistant_message.pk,
"conversation_id": conversation.pk,
"title": conversation.title,
},
)
return
route = route_message_intent(conversation, content) route = route_message_intent(conversation, content)
logger.info( logger.info(
"Stream message started", "Stream message started",
@@ -395,7 +423,6 @@ def stream_message(conversation: Conversation, content: str):
stream_failed = False stream_failed = False
stream_error = "" stream_error = ""
knowledge_context = build_knowledge_context(content)
try: try:
for chunk in stream_reply(conversation, content, knowledge_context=knowledge_context): for chunk in stream_reply(conversation, content, knowledge_context=knowledge_context):
assistant_parts.append(chunk) assistant_parts.append(chunk)
@@ -497,6 +524,76 @@ def build_knowledge_context(content: str, *, n_results: int = 5) -> str:
return "\n\n".join(lines) return "\n\n".join(lines)
def should_refuse_ungrounded_chat(
conversation: Conversation,
content: str,
knowledge_context: str = "",
) -> bool:
if (knowledge_context or "").strip():
return False
if _is_business_related_question(content):
return False
if _has_active_attachments(conversation):
return False
return True
def out_of_scope_reply() -> str:
return (
"没有在当前启用的知识库材料中找到可依据的内容,且这个问题与当前主营业务无关。"
"为避免编造,我不能直接回答。请先上传或启用相关知识库材料,或改问体外诊断试剂注册资料审核、"
"文件汇总、法规核查、申报填表等业务范围内的问题。"
)
def _is_business_related_question(content: str) -> bool:
normalized = (content or "").lower()
compact = "".join(normalized.split())
if not compact:
return True
business_keywords = [
"审核智能体",
"体外诊断",
"ivd",
"nmpa",
"cmde",
"医疗器械",
"注册资料",
"注册申报",
"注册检验",
"注册证",
"申报资料",
"申报文件",
"法规",
"核查",
"审评",
"审核",
"整改",
"风险",
"说明书",
"临床",
"性能",
"安全",
"适用范围",
"预期用途",
"附件",
"文件",
"压缩包",
"目录",
"页数",
"清单",
"汇总",
"模板",
"填表",
"知识库",
"检索",
"报告",
"材料",
"资料",
]
return any(keyword in compact for keyword in business_keywords)
def build_filename_matched_document_context(query: str, *, max_chars: int = 12000) -> str: def build_filename_matched_document_context(query: str, *, max_chars: int = 12000) -> str:
terms = _knowledge_query_terms(query) terms = _knowledge_query_terms(query)
if not terms: if not terms:

View File

@@ -25,6 +25,7 @@ from .views import (
knowledge_base_document_detail, knowledge_base_document_detail,
knowledge_base_document_index, knowledge_base_document_index,
knowledge_base_documents, knowledge_base_documents,
knowledge_base_rebuild_index,
knowledge_base_search, knowledge_base_search,
knowledge_base_status, knowledge_base_status,
) )
@@ -121,6 +122,11 @@ urlpatterns = [
knowledge_base_search, knowledge_base_search,
name="knowledge_base_search", name="knowledge_base_search",
), ),
path(
"api/review-agent/knowledge-base/rebuild-index/",
knowledge_base_rebuild_index,
name="knowledge_base_rebuild_index",
),
path( path(
"api/review-agent/knowledge-base/documents/", "api/review-agent/knowledge-base/documents/",
knowledge_base_documents, knowledge_base_documents,

View File

@@ -1,6 +1,8 @@
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.conf import settings
from django.db.models import Count, Q, Sum from django.db.models import Count, Q, Sum
import json import json
from pathlib import Path
from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse
from django.shortcuts import redirect, render from django.shortcuts import redirect, render
@@ -27,6 +29,9 @@ from .knowledge_base import (
) )
from .models import KnowledgeBaseDocument from .models import KnowledgeBaseDocument
from .regulatory_review.services.info_extract import ensure_regulatory_condition_candidates from .regulatory_review.services.info_extract import ensure_regulatory_condition_candidates
from .regulatory_review.services.rag_embedding import get_embedding_provider
from .regulatory_review.services.rag_index import build_chroma_index
from .regulatory_review.services.rule_loader import load_rule_file
@login_required @login_required
@@ -151,6 +156,24 @@ def knowledge_base_status(request: HttpRequest) -> JsonResponse:
return JsonResponse(build_knowledge_base_context_for_user(request.user)) return JsonResponse(build_knowledge_base_context_for_user(request.user))
@login_required
@require_http_methods(["POST"])
def knowledge_base_rebuild_index(request: HttpRequest) -> JsonResponse:
payload = rebuild_knowledge_base_index()
return JsonResponse({"knowledge_base": build_knowledge_base_context_for_user(request.user), **payload})
def rebuild_knowledge_base_index() -> dict[str, object]:
rule_set = load_rule_file()
source_dir = Path(settings.BASE_DIR) / rule_set["source_material_dir"]
chunk_count = build_chroma_index(
source_dir=source_dir,
embedding_provider=get_embedding_provider(),
reset=True,
)
return {"chunk_count": chunk_count}
@login_required @login_required
@require_http_methods(["POST"]) @require_http_methods(["POST"])
def knowledge_base_search(request: HttpRequest) -> JsonResponse: def knowledge_base_search(request: HttpRequest) -> JsonResponse:

View File

@@ -15,6 +15,8 @@
var sourceTable = document.getElementById("knowledgeSourceTable"); var sourceTable = document.getElementById("knowledgeSourceTable");
var documentFileInput = document.getElementById("knowledgeDocumentFile"); var documentFileInput = document.getElementById("knowledgeDocumentFile");
var uploadDropzone = document.getElementById("knowledgeUploadDropzone"); var uploadDropzone = document.getElementById("knowledgeUploadDropzone");
var rebuildButton = document.getElementById("knowledgeRebuildIndexButton");
var rebuildStatus = document.getElementById("knowledgeRebuildStatus");
function csrfToken() { function csrfToken() {
var cookie = document.cookie.split("; ").find(function (item) { var cookie = document.cookie.split("; ").find(function (item) {
@@ -68,6 +70,17 @@
return response.json(); return response.json();
} }
async function rebuildIndex() {
var response = await fetch(page.getAttribute("data-rebuild-url"), {
method: "POST",
headers: { "X-CSRFToken": csrfToken() },
});
if (!response.ok) {
throw new Error("法规索引重建失败。");
}
return response.json();
}
function renderResults(payload) { function renderResults(payload) {
if (!results) { if (!results) {
return; return;
@@ -196,6 +209,59 @@
}); });
} }
async function handleRebuild(trigger) {
if (!page.getAttribute("data-rebuild-url")) {
return;
}
var originalText = trigger ? trigger.textContent : "";
if (trigger) {
trigger.disabled = true;
trigger.textContent = "入库中";
}
if (rebuildButton && trigger !== rebuildButton) {
rebuildButton.disabled = true;
}
if (rebuildStatus) {
rebuildStatus.textContent = "正在重建法规 RAG 索引...";
}
try {
var payload = await rebuildIndex();
if (rebuildStatus) {
rebuildStatus.textContent = "重建完成,入库片段 " + (payload.chunk_count || 0) + " 个。";
}
window.setTimeout(function () {
window.location.reload();
}, 600);
} catch (error) {
if (rebuildStatus) {
rebuildStatus.textContent = error.message || "法规索引重建失败。";
}
if (trigger) {
trigger.disabled = false;
trigger.textContent = originalText;
}
if (rebuildButton) {
rebuildButton.disabled = false;
}
}
}
if (rebuildButton) {
rebuildButton.addEventListener("click", function () {
handleRebuild(rebuildButton);
});
}
if (sourceTable) {
sourceTable.addEventListener("click", function (event) {
var button = event.target.closest("[data-source-action='index']");
if (!button) {
return;
}
handleRebuild(button);
});
}
if (searchForm && queryInput) { if (searchForm && queryInput) {
searchForm.addEventListener("submit", async function (event) { searchForm.addEventListener("submit", async function (event) {
event.preventDefault(); event.preventDefault();

View File

@@ -32,6 +32,7 @@
class="knowledge-page" class="knowledge-page"
data-document-url="{% url 'knowledge_base_document_list' %}" data-document-url="{% url 'knowledge_base_document_list' %}"
data-search-url="{% url 'knowledge_base_search' %}" data-search-url="{% url 'knowledge_base_search' %}"
data-rebuild-url="{% url 'knowledge_base_rebuild_index' %}"
> >
<header class="attachment-manager-hero attachment-manager-toolbar"> <header class="attachment-manager-hero attachment-manager-toolbar">
<div> <div>
@@ -96,9 +97,10 @@
</div> </div>
</dl> </dl>
<p class="knowledge-panel-note">{{ knowledge_base.status.message }}</p> <p class="knowledge-panel-note">{{ knowledge_base.status.message }}</p>
<p class="upload-status" id="knowledgeRebuildStatus"></p>
<div class="knowledge-form-actions"> <div class="knowledge-form-actions">
<button type="button" onclick="window.location.reload()">刷新状态</button> <button type="button" onclick="window.location.reload()">刷新状态</button>
<button type="button" disabled>重建索引</button> <button type="button" id="knowledgeRebuildIndexButton">重建索引</button>
</div> </div>
</section> </section>
@@ -182,6 +184,7 @@
<th>类型</th> <th>类型</th>
<th>大小</th> <th>大小</th>
<th>索引</th> <th>索引</th>
<th>操作</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
@@ -192,10 +195,13 @@
<td>{{ source.suffix }}</td> <td>{{ source.suffix }}</td>
<td>{{ source.size }} bytes</td> <td>{{ source.size }} bytes</td>
<td>{{ source.indexed_label }}</td> <td>{{ source.indexed_label }}</td>
<td class="attachment-actions">
<button type="button" data-source-action="index">手动入库</button>
</td>
</tr> </tr>
{% empty %} {% empty %}
<tr> <tr>
<td colspan="5" class="table-empty">暂无法规材料</td> <td colspan="6" class="table-empty">暂无法规材料</td>
</tr> </tr>
{% endfor %} {% endfor %}
</tbody> </tbody>
@@ -209,5 +215,5 @@
{% endblock %} {% endblock %}
{% block scripts %} {% block scripts %}
<script src="{% static 'js/knowledge_base.js' %}?v=20260608-kb5"></script> <script src="{% static 'js/knowledge_base.js' %}?v=20260608-kb6"></script>
{% endblock %} {% endblock %}

View File

@@ -1,7 +1,7 @@
import pytest import pytest
from review_agent.models import KnowledgeBaseDocument from review_agent.models import KnowledgeBaseDocument
from review_agent.services import build_knowledge_context from review_agent.services import build_knowledge_context, send_message, stream_message
pytestmark = pytest.mark.django_db pytestmark = pytest.mark.django_db
@@ -57,3 +57,67 @@ def test_build_knowledge_context_uses_full_document_when_name_matches(settings,
assert "全文材料" in context assert "全文材料" in context
assert "来源:用户知识库/孙之烨-260510.txt" in context assert "来源:用户知识库/孙之烨-260510.txt" in context
assert "完整经历:曾组织技术分享并带队参加竞赛" in context assert "完整经历:曾组织技术分享并带队参加竞赛" in context
def test_send_message_refuses_out_of_scope_answer_without_knowledge_context(monkeypatch, django_user_model):
from review_agent.models import Conversation
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
monkeypatch.setattr(
"review_agent.services.search_knowledge_base",
lambda query, n_results=5: {"query": query, "results": [], "error_message": ""},
)
monkeypatch.setattr(
"review_agent.services.generate_reply",
lambda *args, **kwargs: pytest.fail("out-of-scope answer without knowledge context must not call LLM"),
)
_, assistant_message = send_message(conversation, "孙之烨是谁")
assert "没有在当前启用的知识库材料中找到" in assistant_message.content
assert "与当前主营业务无关" in assistant_message.content
def test_stream_message_refuses_out_of_scope_answer_without_knowledge_context(monkeypatch, django_user_model):
from review_agent.models import Conversation
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
monkeypatch.setattr(
"review_agent.services.search_knowledge_base",
lambda query, n_results=5: {"query": query, "results": [], "error_message": ""},
)
monkeypatch.setattr(
"review_agent.services.stream_reply",
lambda *args, **kwargs: pytest.fail("out-of-scope answer without knowledge context must not call streaming LLM"),
)
monkeypatch.setattr(
"review_agent.services.generate_reply",
lambda *args, **kwargs: pytest.fail("out-of-scope answer without knowledge context must not call fallback LLM"),
)
frames = list(stream_message(conversation, "给我一份红烧肉菜谱"))
assert any("没有在当前启用的知识库材料中找到" in frame for frame in frames)
assert any("与当前主营业务无关" in frame for frame in frames)
assert any("done" in frame for frame in frames)
def test_business_question_without_knowledge_context_can_use_llm(monkeypatch, django_user_model):
from review_agent.models import Conversation
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
monkeypatch.setattr(
"review_agent.services.search_knowledge_base",
lambda query, n_results=5: {"query": query, "results": [], "error_message": ""},
)
monkeypatch.setattr(
"review_agent.services.generate_reply",
lambda *args, **kwargs: "注册检验报告通常用于证明产品性能符合要求。",
)
_, assistant_message = send_message(conversation, "注册检验报告有什么作用")
assert "注册检验报告" in assistant_message.content

View File

@@ -10,6 +10,7 @@ from review_agent.models import (
FileAttachment, FileAttachment,
FileSummaryBatch, FileSummaryBatch,
Message, Message,
RegulatoryReviewBatch,
WorkflowNodeRun, WorkflowNodeRun,
) )
@@ -269,6 +270,39 @@ def test_conversation_delete_api_removes_owned_conversation(client, django_user_
assert Conversation.objects.filter(pk=other_conversation.pk).exists() assert Conversation.objects.filter(pk=other_conversation.pk).exists()
def test_conversation_delete_api_removes_protected_workflow_dependents(client, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="待删除")
summary_batch = FileSummaryBatch.objects.create(
conversation=conversation,
user=user,
batch_no="FS-DELETE-PROTECTED",
)
regulatory_batch = RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary_batch,
batch_no="RR-DELETE-PROTECTED",
)
form_batch = ApplicationFormFillBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary_batch,
source_regulatory_batch=regulatory_batch,
batch_no="AFF-DELETE-PROTECTED",
)
client.force_login(user)
response = client.delete(reverse("review_agent_conversation_detail", args=[conversation.pk]))
assert response.status_code == 200
assert response.json()["ok"] is True
assert not Conversation.objects.filter(pk=conversation.pk).exists()
assert not FileSummaryBatch.objects.filter(pk=summary_batch.pk).exists()
assert not RegulatoryReviewBatch.objects.filter(pk=regulatory_batch.pk).exists()
assert not ApplicationFormFillBatch.objects.filter(pk=form_batch.pk).exists()
def test_conversation_delete_api_rejects_unowned_conversation(client, django_user_model): def test_conversation_delete_api_rejects_unowned_conversation(client, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass") user = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass") other = django_user_model.objects.create_user(username="other", password="pass")

View File

@@ -286,7 +286,7 @@ def test_stream_message_falls_back_to_non_stream_reply_when_stream_breaks(monkey
lambda conversation, content, knowledge_context="": "非流式完整回复", lambda conversation, content, knowledge_context="": "非流式完整回复",
) )
frames = list(stream_message(conversation, "普通问题")) frames = list(stream_message(conversation, "注册检验报告审核要点有哪些"))
joined = "".join(frames) joined = "".join(frames)
assert "已生成部分内容" in joined assert "已生成部分内容" in joined

View File

@@ -2,7 +2,14 @@ import pytest
from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse from django.urls import reverse
from review_agent.knowledge_base import build_knowledge_base_context, delete_document, search_knowledge_base from review_agent.knowledge_base import (
build_knowledge_base_context,
delete_document,
index_managed_document,
search_knowledge_base,
update_document,
)
from review_agent.views import rebuild_knowledge_base_index
from review_agent.models import KnowledgeBaseDocument from review_agent.models import KnowledgeBaseDocument
@@ -16,6 +23,7 @@ def test_knowledge_base_context_reports_rule_and_sources():
assert context["rule"]["requirement_count"] > 0 assert context["rule"]["requirement_count"] > 0
assert context["source_count"] > 0 assert context["source_count"] > 0
assert context["collection_name"] == "nmpa_ivd_registration_v1" assert context["collection_name"] == "nmpa_ivd_registration_v1"
assert not any("模拟题二" in source["relative_path"] for source in context["sources"])
def test_knowledge_base_page_requires_login(client): def test_knowledge_base_page_requires_login(client):
@@ -36,6 +44,11 @@ def test_knowledge_base_page_renders_for_user(client, django_user_model):
content = response.content.decode("utf-8") content = response.content.decode("utf-8")
tabbar = content[content.index('<div class="tabbar"') : content.index("</div>", content.index('<div class="tabbar"'))] tabbar = content[content.index('<div class="tabbar"') : content.index("</div>", content.index('<div class="tabbar"'))]
assert tabbar.index("审核智能体") < tabbar.index("知识库管理") < tabbar.index("附件管理") assert tabbar.index("审核智能体") < tabbar.index("知识库管理") < tabbar.index("附件管理")
assert "data-rebuild-url=" in content
assert 'id="knowledgeRebuildIndexButton"' in content
assert "重建索引" in content
assert 'data-source-action="index"' in content
assert "手动入库" in content
def test_knowledge_base_status_api(client, django_user_model): def test_knowledge_base_status_api(client, django_user_model):
@@ -48,6 +61,53 @@ def test_knowledge_base_status_api(client, django_user_model):
assert response.json()["rule"]["code"] == "nmpa_ivd_registration_v1" assert response.json()["rule"]["code"] == "nmpa_ivd_registration_v1"
def test_knowledge_base_rebuild_index_api(client, django_user_model, monkeypatch):
user = django_user_model.objects.create_user(username="owner", password="pass")
client.force_login(user)
calls = []
monkeypatch.setattr(
"review_agent.views.rebuild_knowledge_base_index",
lambda: calls.append("rebuild") or {"chunk_count": 12},
)
response = client.post(reverse("knowledge_base_rebuild_index"))
assert response.status_code == 200
assert response.json()["chunk_count"] == 12
assert response.json()["knowledge_base"]["collection"]["count"] >= 0
assert calls == ["rebuild"]
def test_rebuild_knowledge_base_index_requests_reset(settings, tmp_path, monkeypatch):
settings.MEDIA_ROOT = tmp_path
settings.REGULATORY_RAG_CHROMA_PATH = tmp_path / "chroma"
settings.REGULATORY_RAG_CHROMA_PATH.mkdir()
stale_file = settings.REGULATORY_RAG_CHROMA_PATH / "chroma.sqlite3"
stale_file.write_text("stale", encoding="utf-8")
calls = []
monkeypatch.setattr("review_agent.views.load_rule_file", lambda: {"source_material_dir": "docs/0.原始材料"})
monkeypatch.setattr("review_agent.views.get_embedding_provider", lambda: "provider")
monkeypatch.setattr(
"review_agent.views.build_chroma_index",
lambda source_dir, embedding_provider, reset=False: calls.append(
{
"source_dir": source_dir,
"embedding_provider": embedding_provider,
"reset": reset,
}
)
or 8,
)
payload = rebuild_knowledge_base_index()
assert payload["chunk_count"] == 8
assert calls[0]["embedding_provider"] == "provider"
assert calls[0]["reset"] is True
def test_knowledge_base_search_rejects_blank_query(): def test_knowledge_base_search_rejects_blank_query():
payload = search_knowledge_base("") payload = search_knowledge_base("")
@@ -103,6 +163,8 @@ def test_knowledge_base_search_api_returns_payload(client, django_user_model):
def test_knowledge_base_document_crud_api(client, settings, tmp_path, django_user_model): def test_knowledge_base_document_crud_api(client, settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path settings.MEDIA_ROOT = tmp_path
settings.REGULATORY_RAG_CHROMA_PATH = tmp_path / "chroma"
settings.REGULATORY_RAG_PROVIDER = "deterministic"
user = django_user_model.objects.create_user(username="owner", password="pass") user = django_user_model.objects.create_user(username="owner", password="pass")
client.force_login(user) client.force_login(user)
@@ -176,6 +238,67 @@ def test_delete_document_removes_managed_chunks_from_index(monkeypatch, django_u
assert deleted_filters == [{"document_id": document.pk}] assert deleted_filters == [{"document_id": document.pk}]
def test_disabling_document_removes_managed_chunks_from_index(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
document = KnowledgeBaseDocument.objects.create(
user=user,
display_name="孙之烨简历",
original_name="孙之烨-260510.pdf",
storage_path="knowledge_base/resume.pdf",
file_size=1,
status=KnowledgeBaseDocument.Status.ACTIVE,
is_active=True,
indexed_chunk_count=7,
metadata={"index_status": "indexed", "index_error": ""},
)
deleted_filters = []
class FakeCollection:
def delete(self, where):
deleted_filters.append(where)
monkeypatch.setattr("review_agent.knowledge_base._load_chroma_collection", lambda: FakeCollection())
update_document(document, {"is_active": False})
document.refresh_from_db()
assert document.status == KnowledgeBaseDocument.Status.DISABLED
assert document.is_active is False
assert document.indexed_chunk_count == 0
assert document.metadata["index_status"] == "disabled"
assert deleted_filters == [{"document_id": document.pk}]
def test_inactive_document_manual_index_clears_existing_chunks(monkeypatch, django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
document = KnowledgeBaseDocument.objects.create(
user=user,
display_name="孙之烨简历",
original_name="孙之烨-260510.pdf",
storage_path="knowledge_base/resume.pdf",
file_size=1,
status=KnowledgeBaseDocument.Status.DISABLED,
is_active=False,
indexed_chunk_count=7,
metadata={"index_status": "indexed", "index_error": ""},
)
deleted_filters = []
class FakeCollection:
def delete(self, where):
deleted_filters.append(where)
monkeypatch.setattr("review_agent.knowledge_base._load_chroma_collection", lambda: FakeCollection())
chunk_count = index_managed_document(document)
document.refresh_from_db()
assert chunk_count == 0
assert document.indexed_chunk_count == 0
assert document.metadata["index_status"] == "disabled"
assert deleted_filters == [{"document_id": document.pk}]
def test_knowledge_base_document_api_is_scoped_to_owner(client, django_user_model): def test_knowledge_base_document_api_is_scoped_to_owner(client, django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass") owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass") other = django_user_model.objects.create_user(username="other", password="pass")
@@ -199,6 +322,8 @@ def test_knowledge_base_document_api_is_scoped_to_owner(client, django_user_mode
def test_knowledge_base_document_manual_index_api(client, settings, tmp_path, django_user_model): def test_knowledge_base_document_manual_index_api(client, settings, tmp_path, django_user_model):
settings.MEDIA_ROOT = tmp_path settings.MEDIA_ROOT = tmp_path
settings.REGULATORY_RAG_CHROMA_PATH = tmp_path / "chroma"
settings.REGULATORY_RAG_PROVIDER = "deterministic"
user = django_user_model.objects.create_user(username="owner", password="pass") user = django_user_model.objects.create_user(username="owner", password="pass")
client.force_login(user) client.force_login(user)
source_path = tmp_path / "manual.md" source_path = tmp_path / "manual.md"

View File

@@ -1,3 +1,5 @@
import sys
import pytest import pytest
from review_agent.regulatory_review.services.rag_citation import ( from review_agent.regulatory_review.services.rag_citation import (
@@ -7,6 +9,7 @@ from review_agent.regulatory_review.services.rag_citation import (
from review_agent.regulatory_review.services.rag_embedding import SiliconFlowEmbeddingProvider from review_agent.regulatory_review.services.rag_embedding import SiliconFlowEmbeddingProvider
from review_agent.regulatory_review.services.rag_index import chunk_text from review_agent.regulatory_review.services.rag_index import chunk_text
from review_agent.regulatory_review.services.rag_index import collect_source_chunks from review_agent.regulatory_review.services.rag_index import collect_source_chunks
from review_agent.regulatory_review.services.rag_index import build_chroma_index
def test_siliconflow_embedding_provider_posts_expected_payload(monkeypatch): def test_siliconflow_embedding_provider_posts_expected_payload(monkeypatch):
@@ -86,3 +89,141 @@ def test_collect_source_chunks_requires_attachment4_extraction(monkeypatch, tmp_
with pytest.raises(RuntimeError, match="附件 4"): with pytest.raises(RuntimeError, match="附件 4"):
collect_source_chunks(source_dir) collect_source_chunks(source_dir)
def test_collect_source_chunks_excludes_demo_agent_materials(monkeypatch, tmp_path):
source_dir = tmp_path / "sources"
source_dir.mkdir()
demo_dir = source_dir / "【模拟题二】试剂盒临床注册文件准备与审核Agent"
demo_dir.mkdir()
(demo_dir / "【模拟题二】试剂盒临床注册文件准备与审核Agent.md").write_text("题目材料", encoding="utf-8")
(source_dir / "【模拟题二】试剂盒临床注册文件准备与审核Agent.docx").write_bytes(b"demo")
real_source = source_dir / "附件 4 体外诊断试剂注册申报资料要求及说明.doc"
real_source.write_bytes(b"rule")
def fake_extract(path):
return "附件4 正文" if path == real_source else "不应被抽取"
monkeypatch.setattr("review_agent.regulatory_review.services.rag_index.extract_text_from_path", fake_extract)
chunks = collect_source_chunks(source_dir)
assert chunks
assert all("模拟题二" not in chunk.metadata["source"] for chunk in chunks)
def test_build_chroma_index_reset_recreates_collection_without_deleting_index_dir(settings, monkeypatch, tmp_path):
settings.MEDIA_ROOT = tmp_path
persist_path = tmp_path / "chroma"
persist_path.mkdir()
stale_file = persist_path / "chroma.sqlite3"
stale_file.write_text("stale", encoding="utf-8")
source_dir = tmp_path / "sources"
source_dir.mkdir()
(source_dir / "rule.md").write_text("注册检验报告要求", encoding="utf-8")
client_states = []
deleted_collections = []
class FakeCollection:
def upsert(self, **kwargs):
return None
class FakeClient:
def __init__(self, path):
client_states.append({"path": path, "stale_exists": stale_file.exists()})
def delete_collection(self, name):
deleted_collections.append(name)
def get_or_create_collection(self, name):
return FakeCollection()
class FakeSharedSystemClient:
@staticmethod
def clear_system_cache():
client_states.append({"path": "cache-cleared", "stale_exists": stale_file.exists()})
monkeypatch.setitem(sys.modules, "chromadb", type("FakeChromaModule", (), {"PersistentClient": FakeClient}))
monkeypatch.setitem(
sys.modules,
"chromadb.api.shared_system_client",
type("FakeSharedSystemClientModule", (), {"SharedSystemClient": FakeSharedSystemClient}),
)
count = build_chroma_index(
source_dir=source_dir,
embedding_provider=lambda texts: [[0.1, 0.2] for _ in texts],
persist_path=persist_path,
collection_name="test",
reset=True,
)
assert count == 1
assert client_states == [
{"path": str(persist_path), "stale_exists": True},
{"path": "cache-cleared", "stale_exists": True},
{"path": str(persist_path), "stale_exists": True},
]
assert stale_file.exists()
assert deleted_collections == ["test"]
def test_build_chroma_index_reset_clears_bad_index_dir_after_chroma_cache_reset(settings, monkeypatch, tmp_path):
settings.MEDIA_ROOT = tmp_path
persist_path = tmp_path / "chroma"
persist_path.mkdir()
stale_file = persist_path / "chroma.sqlite3"
stale_file.write_text("stale", encoding="utf-8")
source_dir = tmp_path / "sources"
source_dir.mkdir()
(source_dir / "rule.md").write_text("注册检验报告要求", encoding="utf-8")
events = []
class FakeCollection:
def upsert(self, **kwargs):
return None
class BrokenThenFreshClient:
attempts = 0
def __init__(self, path):
BrokenThenFreshClient.attempts += 1
events.append(("client", BrokenThenFreshClient.attempts, stale_file.exists()))
if BrokenThenFreshClient.attempts == 1:
raise ValueError("Could not connect to tenant default_tenant")
def get_or_create_collection(self, name):
return FakeCollection()
class FakeSharedSystemClient:
@staticmethod
def clear_system_cache():
events.append(("clear_cache", stale_file.exists()))
fake_chromadb = type(
"FakeChromaModule",
(),
{"PersistentClient": BrokenThenFreshClient},
)
monkeypatch.setitem(sys.modules, "chromadb", fake_chromadb)
monkeypatch.setitem(
sys.modules,
"chromadb.api.shared_system_client",
type("FakeSharedSystemClientModule", (), {"SharedSystemClient": FakeSharedSystemClient}),
)
count = build_chroma_index(
source_dir=source_dir,
embedding_provider=lambda texts: [[0.1, 0.2] for _ in texts],
persist_path=persist_path,
collection_name="test",
reset=True,
)
assert count == 1
assert events == [
("client", 1, True),
("clear_cache", True),
("client", 2, False),
]
assert not stale_file.exists()