feat(regulatory-info-package): 实现材料包生成工作流

This commit is contained in:
2026-06-10 19:49:44 +08:00
parent f0286264e2
commit dac8ce3c14
21 changed files with 1512 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
"""Services for the regulatory information package workflow."""

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from pathlib import Path
from docx import Document
from docx.enum.text import WD_COLOR_INDEX
from docx.shared import RGBColor
from review_agent.regulatory_info_package.schemas import MergedField
def write_docx_from_template(
source_path: str | Path,
output_path: str | Path,
merged_fields: dict[str, MergedField],
) -> tuple[int, int, int]:
source = Path(source_path)
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
if source.exists():
document = Document(source)
else:
document = Document()
replacements = {f"{{{{{key}}}}}": field for key, field in merged_fields.items()}
highlight_count = 0
missing_count = 0
llm_only_count = 0
for paragraph in document.paragraphs:
for placeholder, field in replacements.items():
if placeholder in paragraph.text:
_replace_paragraph_text(paragraph, paragraph.text.replace(placeholder, field.value), field)
if field.highlight_reason != "none":
highlight_count += 1
if field.highlight_reason == "missing":
missing_count += 1
if field.highlight_reason == "llm_only":
llm_only_count += 1
document.add_page_break()
heading = document.add_paragraph()
heading_run = heading.add_run("预生成字段")
heading_run.bold = True
table = document.add_table(rows=1, cols=4)
table.rows[0].cells[0].text = "字段"
table.rows[0].cells[1].text = ""
table.rows[0].cells[2].text = "来源"
table.rows[0].cells[3].text = "待确认"
for field in merged_fields.values():
cells = table.add_row().cells
cells[0].text = field.label
cells[1].text = field.value
cells[2].text = field.source
cells[3].text = "" if field.needs_review else ""
if field.highlight_reason != "none":
highlight_count += 1
if field.highlight_reason == "missing":
missing_count += 1
if field.highlight_reason == "llm_only":
llm_only_count += 1
document.save(output)
return highlight_count, missing_count, llm_only_count
def _replace_paragraph_text(paragraph, text: str, field: MergedField) -> None:
for run in paragraph.runs:
run.text = ""
run = paragraph.add_run(text)
if field.highlight_reason != "none":
run.font.highlight_color = WD_COLOR_INDEX.YELLOW
if field.highlight_reason == "conflict":
run.font.color.rgb = RGBColor(255, 0, 0)

View File

@@ -0,0 +1,135 @@
from __future__ import annotations
import json
import re
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Callable
from review_agent.llm import generate_completion
from review_agent.regulatory_info_package.schemas import InstructionExtractResult
FIELD_PATTERNS = {
"product_name": ("产品名称", r"产品名称[:\s]*([^\n\r]+)"),
"storage_condition": ("储存条件", r"(?:储存条件|贮存条件|保存条件)[:\s]*([^\n\r]+)"),
"intended_use": ("预期用途", r"预期用途[:\s]*([^\n\r]+)"),
"package_specification": ("包装规格", r"(?:包装规格|规格)[:\s]*([^\n\r]+)"),
"sample_type": ("样本类型", r"样本类型[:\s]*([^\n\r]+)"),
"applicable_instrument": ("适用仪器", r"适用仪器[:\s]*([^\n\r]+)"),
"standard_no": ("标准号", r"((?:GB|YY|WS|T/C[A-Z0-9]*)[ /T0-9.\-—]+)"),
}
def extract_fields_by_rules(instruction: InstructionExtractResult) -> dict[str, dict]:
text = "\n".join([instruction.front_text, *instruction.paragraphs, *instruction.sections.values()])
results: dict[str, dict] = {}
for key, (label, pattern) in FIELD_PATTERNS.items():
section_value = _value_after_label_paragraph(instruction.paragraphs, label)
if section_value:
results[key] = {
"label": label,
"value": section_value,
"evidence": f"{label}\n{section_value}",
"confidence": 0.82,
"source": "rule",
}
continue
match = re.search(pattern, text, flags=re.IGNORECASE)
if match:
value = _clean_value(match.group(1))
if value:
results[key] = {
"label": label,
"value": value,
"evidence": match.group(0)[:240],
"confidence": 0.75,
"source": "rule",
}
return results
def extract_fields_with_llm(instruction: InstructionExtractResult) -> dict[str, dict]:
prompt = (
"请从体外诊断试剂产品说明书中抽取字段,输出 JSON 对象,字段包括 "
"product_name、storage_condition、intended_use、package_specification、sample_type、applicable_instrument、standard_no。"
"每个字段值为 {label,value,evidence,confidence}。\n\n"
+ instruction.front_text[:6000]
)
raw = generate_completion([{"role": "user", "content": prompt}], temperature=0.0)
payload = _parse_json_object(raw)
return {key: value for key, value in payload.items() if isinstance(value, dict)}
def run_llm_extract_with_retry(
instruction: InstructionExtractResult,
*,
llm_extract_func: Callable[[InstructionExtractResult], dict[str, dict]] | None = None,
sleep_func: Callable[[float], None] = time.sleep,
) -> dict[str, dict]:
func = llm_extract_func or extract_fields_with_llm
last_exc: Exception | None = None
for delay in [0, 1, 2]:
if delay:
sleep_func(delay)
try:
return func(instruction)
except Exception as exc:
last_exc = exc
if last_exc:
raise last_exc
return {}
def run_parallel_extract(
instruction: InstructionExtractResult,
*,
llm_extract_func: Callable[[InstructionExtractResult], dict[str, dict]] | None = None,
) -> dict:
payload = {"regex_results": {}, "llm_results": {}, "llm_error": ""}
with ThreadPoolExecutor(max_workers=2) as executor:
rule_future = executor.submit(extract_fields_by_rules, instruction)
llm_future = executor.submit(run_llm_extract_with_retry, instruction, llm_extract_func=llm_extract_func)
payload["regex_results"] = rule_future.result()
try:
payload["llm_results"] = llm_future.result()
except Exception as exc:
payload["llm_error"] = str(exc)
return payload
def save_field_extract_result(path: str | Path, payload: dict) -> Path:
target = Path(path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return target
def _clean_value(value: str) -> str:
cleaned = value.strip()
if cleaned in {"", "】】", "】:"}:
return ""
return re.split(r"[。;;]", cleaned)[0].strip()
def _value_after_label_paragraph(paragraphs: list[str], label: str) -> str:
bracketed = {f"{label}", f"[{label}]", label}
for index, text in enumerate(paragraphs):
stripped = text.strip()
if stripped in bracketed and index + 1 < len(paragraphs):
return _clean_value(paragraphs[index + 1])
return ""
def _parse_json_object(raw: str) -> dict:
text = (raw or "").strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.lower().startswith("json"):
text = text[4:].strip()
start = text.find("{")
end = text.rfind("}")
if start == -1 or end == -1:
return {}
return json.loads(text[start : end + 1])

View File

@@ -0,0 +1,115 @@
from __future__ import annotations
import json
from pathlib import Path
from review_agent.regulatory_info_package.schemas import MergedField
REQUIRED_FIELDS = {
"product_name": "产品名称",
"applicant_name": "申请人名称",
"package_specification": "包装规格",
"intended_use": "预期用途",
"storage_condition": "储存条件",
}
def merge_fields(rule_results: dict[str, dict], llm_results: dict[str, dict]) -> tuple[dict[str, MergedField], dict[str, list[dict]]]:
merged: dict[str, MergedField] = {}
missing_fields: list[dict] = []
llm_only_fields: list[dict] = []
conflict_fields: list[dict] = []
keys = set(REQUIRED_FIELDS) | set(rule_results) | set(llm_results)
for key in sorted(keys):
rule = rule_results.get(key) or {}
llm = llm_results.get(key) or {}
rule_value = str(rule.get("value") or "").strip()
llm_value = str(llm.get("value") or "").strip()
label = str(rule.get("label") or llm.get("label") or REQUIRED_FIELDS.get(key) or key)
if rule_value and llm_value and rule_value != llm_value:
field = MergedField(
key=key,
label=label,
value=rule_value,
source="rule_conflict",
evidence=str(rule.get("evidence") or ""),
confidence=float(rule.get("confidence") or 0.0),
highlight_reason="conflict",
needs_review=True,
rule_value=rule_value,
llm_value=llm_value,
)
conflict_fields.append(
{
"field_key": key,
"field_label": label,
"rule_value": rule_value,
"llm_value": llm_value,
"selected_value": rule_value,
"handling": "规则优先,写入值高亮并进入追溯清单",
}
)
elif rule_value:
field = MergedField(
key=key,
label=label,
value=rule_value,
source="rule",
evidence=str(rule.get("evidence") or ""),
confidence=float(rule.get("confidence") or 0.0),
)
elif llm_value:
field = MergedField(
key=key,
label=label,
value=llm_value,
source="llm",
evidence=str(llm.get("evidence") or ""),
confidence=float(llm.get("confidence") or 0.0),
highlight_reason="llm_only",
needs_review=True,
llm_value=llm_value,
)
llm_only_fields.append(_review_dict(field))
else:
field = MergedField(
key=key,
label=label,
value="/",
source="missing",
evidence="",
confidence=0.0,
highlight_reason="missing",
needs_review=True,
)
missing_fields.append(_review_dict(field))
merged[key] = field
return merged, {
"missing_fields": missing_fields,
"llm_only_fields": llm_only_fields,
"conflict_fields": conflict_fields,
}
def save_merged_fields(path: str | Path, merged: dict[str, MergedField], summary: dict[str, list[dict]]) -> Path:
target = Path(path)
target.parent.mkdir(parents=True, exist_ok=True)
payload = {
"fields": {key: field.__dict__ for key, field in merged.items()},
**summary,
}
target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return target
def _review_dict(field: MergedField) -> dict:
return {
"target_file": "",
"field_key": field.key,
"field_label": field.label,
"final_value": field.value,
"highlight_reason": field.highlight_reason,
"needs_review": field.needs_review,
}

View File

@@ -0,0 +1,105 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from review_agent.models import Conversation, FileAttachment, FileSummaryBatch, FileSummaryItem
@dataclass
class InstructionInputSelection:
status: str
file_name: str = ""
storage_path: str = ""
attachment: FileAttachment | None = None
source_summary_batch: FileSummaryBatch | None = None
source_summary_item_id: int | None = None
candidates: list[str] = field(default_factory=list)
message: str = ""
def select_instruction_input(conversation: Conversation, message: str) -> InstructionInputSelection:
candidates = _active_docx_attachments(conversation)
named = _match_by_message(candidates, message)
if len(named) == 1:
return _selection_from_attachment(named[0])
instruction_candidates = [item for item in candidates if "说明书" in item.original_name]
if len(instruction_candidates) == 1:
return _selection_from_attachment(instruction_candidates[0])
if len(candidates) == 1:
return _selection_from_attachment(candidates[0])
if len(instruction_candidates) > 1 or len(candidates) > 1:
names = [item.original_name for item in (instruction_candidates or candidates)]
return InstructionInputSelection(
status="waiting_user",
candidates=names,
message="请确认用于生成第1章监管信息的说明书文件名" + "".join(names),
)
summary_selection = _select_from_latest_summary(conversation, message)
if summary_selection:
return summary_selection
return InstructionInputSelection(status="missing", message="请先上传产品说明书 docx 文件。")
def _active_docx_attachments(conversation: Conversation) -> list[FileAttachment]:
return list(
FileAttachment.objects.filter(
conversation=conversation,
is_active=True,
)
.exclude(upload_status=FileAttachment.UploadStatus.DELETED)
.filter(original_name__iendswith=".docx")
.order_by("original_name", "-version_no")
)
def _match_by_message(candidates: list[FileAttachment], message: str) -> list[FileAttachment]:
compact = "".join((message or "").lower().split())
matched = []
for attachment in candidates:
stem = Path(attachment.original_name).stem.lower()
name = attachment.original_name.lower()
if stem and stem in compact or name and name in compact:
matched.append(attachment)
return matched
def _selection_from_attachment(attachment: FileAttachment) -> InstructionInputSelection:
return InstructionInputSelection(
status="selected",
file_name=attachment.original_name,
storage_path=attachment.storage_path,
attachment=attachment,
)
def _select_from_latest_summary(conversation: Conversation, message: str) -> InstructionInputSelection | None:
batch = (
FileSummaryBatch.objects.filter(conversation=conversation, status=FileSummaryBatch.Status.SUCCESS)
.order_by("-finished_at", "-created_at", "-id")
.first()
)
if not batch:
return None
items = list(batch.items.filter(file_name__iendswith=".docx").order_by("file_name", "id"))
compact = "".join((message or "").lower().split())
named = [item for item in items if Path(item.file_name).stem.lower() in compact or item.file_name.lower() in compact]
candidates = named or [item for item in items if "说明书" in item.file_name]
if len(candidates) == 1:
item = candidates[0]
return InstructionInputSelection(
status="selected",
file_name=item.file_name,
storage_path=item.storage_path,
source_summary_batch=batch,
source_summary_item_id=item.pk,
)
if len(candidates) > 1:
return InstructionInputSelection(
status="waiting_user",
source_summary_batch=batch,
candidates=[item.file_name for item in candidates],
message="请确认用于生成第1章监管信息的说明书文件名" + "".join(item.file_name for item in candidates),
)
return None

View File

@@ -0,0 +1,77 @@
from __future__ import annotations
import json
from pathlib import Path
from docx import Document
from review_agent.regulatory_info_package.schemas import InstructionExtractResult
def parse_instruction_docx(path: str | Path) -> InstructionExtractResult:
file_path = Path(path)
document = Document(file_path)
paragraphs = [paragraph.text.strip() for paragraph in document.paragraphs if paragraph.text.strip()]
tables = []
for table in document.tables:
rows = []
for row in table.rows:
rows.append([" ".join(cell.text.split()) for cell in row.cells])
if rows:
tables.append(rows)
sections = _build_sections(paragraphs)
front_text = "\n".join(paragraphs[:30])
return InstructionExtractResult(
source_file_name=file_path.name,
paragraphs=paragraphs,
sections=sections,
tables=tables,
component_tables=_component_tables(tables),
front_text=front_text,
)
def save_instruction_extract_json(path: str | Path, result: InstructionExtractResult) -> Path:
target = Path(path)
target.parent.mkdir(parents=True, exist_ok=True)
payload = {
"source_file_name": result.source_file_name,
"paragraphs": result.paragraphs,
"sections": result.sections,
"tables": result.tables,
"component_tables": result.component_tables,
"front_text": result.front_text,
}
target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return target
def _build_sections(paragraphs: list[str]) -> dict[str, str]:
sections: dict[str, list[str]] = {}
current = "front"
for text in paragraphs:
if _looks_like_heading(text):
current = text[:80]
sections.setdefault(current, [])
continue
sections.setdefault(current, []).append(text)
return {key: "\n".join(value).strip() for key, value in sections.items() if value}
def _looks_like_heading(text: str) -> bool:
compact = text.strip()
if len(compact) > 40:
return False
heading_markers = ("一、", "二、", "三、", "四、", "五、", "六、", "", "产品名称", "预期用途", "主要组成")
return compact.startswith(heading_markers)
def _component_tables(tables: list[list[list[str]]]) -> list[dict]:
results = []
for table in tables:
header = table[0] if table else []
joined = "".join(header)
if any(keyword in joined for keyword in ["组成", "组分", "成分"]):
results.append({"header": header, "rows": table[1:]})
return results

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import shutil
from dataclasses import dataclass
from pathlib import Path
from docx import Document
from review_agent.regulatory_info_package.schemas import MergedField
@dataclass(frozen=True)
class LegacyDocCapability:
status: str
adapter: str
message: str = ""
def detect_legacy_doc_capability() -> LegacyDocCapability:
try:
import win32com.client # noqa: F401
return LegacyDocCapability(status="available", adapter="WordComDocAdapter", message="Word COM 可用")
except Exception as exc:
return LegacyDocCapability(
status="unavailable",
adapter="UnavailableLegacyDocAdapter",
message=f"Word COM 不可用:{type(exc).__name__}",
)
def write_legacy_doc_or_fallback(
source_path: str | Path,
output_path: str | Path,
merged_fields: dict[str, MergedField],
) -> tuple[Path, str, dict]:
source = Path(source_path)
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
capability = detect_legacy_doc_capability()
if capability.status == "available" and source.exists():
shutil.copy2(source, output)
return output, "success", {"doc": capability.__dict__, "fallback_used": False}
fallback = output.with_suffix(".docx")
document = Document()
document.add_heading(output.stem, level=1)
document.add_paragraph("当前环境未检测到可用的 .doc 原生写入能力,已生成 docx 兜底文件。")
for field in merged_fields.values():
document.add_paragraph(f"{field.label}{field.value}")
document.save(fallback)
return fallback, "fallback_success", {"doc": capability.__dict__, "fallback_used": True}

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from review_agent.models import RegulatoryInfoPackageBatch
from review_agent.regulatory_info_package.constants import GENERATED_FILE_FAILED
from review_agent.regulatory_info_package.schemas import GeneratedFileResult, MergedField, TemplateSpec
from review_agent.regulatory_info_package.services.docx_document import write_docx_from_template
from review_agent.regulatory_info_package.services.legacy_doc_document import write_legacy_doc_or_fallback
from review_agent.regulatory_info_package.services.template_repository import copy_template_to_batch, template_specs
from review_agent.regulatory_info_package.storage import ensure_batch_subdir
def generate_package_documents(
batch: RegulatoryInfoPackageBatch,
config: dict,
merged_fields: dict[str, MergedField],
) -> list[GeneratedFileResult]:
specs = template_specs(config)
with ThreadPoolExecutor(max_workers=min(4, len(specs) or 1)) as executor:
futures = [executor.submit(_generate_one, batch, config, spec, merged_fields) for spec in specs]
return [future.result() for future in as_completed(futures)]
def _generate_one(
batch: RegulatoryInfoPackageBatch,
config: dict,
spec: TemplateSpec,
merged_fields: dict[str, MergedField],
) -> GeneratedFileResult:
try:
template_path = copy_template_to_batch(batch, config, spec)
generated_dir = ensure_batch_subdir(batch, "generated")
output_path = generated_dir / spec.output_name
adapter_summary = {}
if spec.file_format == "doc":
actual_path, status, adapter_summary = write_legacy_doc_or_fallback(template_path, output_path, merged_fields)
actual_format = actual_path.suffix.lower().lstrip(".")
highlight_count = missing_count = llm_only_count = 0
else:
highlight_count, missing_count, llm_only_count = write_docx_from_template(template_path, output_path, merged_fields)
actual_path = output_path
actual_format = "docx"
status = "success"
return GeneratedFileResult(
template_code=spec.code,
file_name=actual_path.name,
requested_format=spec.file_format,
actual_format=actual_format,
status=status,
path=str(actual_path),
highlight_count=highlight_count,
missing_count=missing_count,
llm_only_count=llm_only_count,
)
except Exception as exc:
return GeneratedFileResult(
template_code=spec.code,
file_name=spec.output_name,
requested_format=spec.file_format,
actual_format=spec.file_format,
status=GENERATED_FILE_FAILED,
error_message=str(exc),
)

View File

@@ -0,0 +1,12 @@
from __future__ import annotations
def build_assistant_summary(*, batch_no: str, exports: list[dict], failed_files: list[dict]) -> str:
zip_exports = [item for item in exports if item.get("export_type") == "zip" or str(item.get("file_name", "")).endswith(".zip")]
other_exports = [item for item in exports if item not in zip_exports]
lines = [f"已完成第1章监管信息材料包生成批次号{batch_no}", ""]
for export in [*zip_exports, *other_exports]:
lines.append(f"- [{export['file_name']}]({export['download_url']})")
for failed in failed_files:
lines.append(f"- {failed.get('file_name')}:生成失败,{failed.get('error_message') or '原因待查看'}")
return "\n".join(lines)

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import hashlib
from pathlib import Path
import yaml
from django.conf import settings
CONFIG_PATH = Path(__file__).resolve().parents[1] / "templates" / "regulatory_info_package_templates_v1.yaml"
def load_template_config(path: str | Path | None = None) -> dict:
config_path = Path(path) if path else CONFIG_PATH
with config_path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
if payload.get("source_dir"):
payload["source_dir"] = str((Path(settings.BASE_DIR) / payload["source_dir"]).resolve())
return payload
def compute_config_hash(path: str | Path | None = None) -> str:
config_path = Path(path) if path else CONFIG_PATH
digest = hashlib.sha256()
digest.update(config_path.read_bytes())
return digest.hexdigest()
def validate_template_config(config: dict) -> list[str]:
errors: list[str] = []
source_dir = Path(config.get("source_dir") or "")
if not source_dir.exists():
errors.append(f"模板源目录不存在:{source_dir}")
templates = config.get("templates") or []
if len(templates) != 7:
errors.append("第1章监管信息模板配置必须包含 7 个模板。")
seen: set[str] = set()
for template in templates:
code = str(template.get("code") or "")
if not code:
errors.append("模板 code 不能为空。")
elif code in seen:
errors.append(f"模板 code 重复:{code}")
seen.add(code)
source_file = str(template.get("source_file") or "")
output_name = str(template.get("output_name") or "")
if not source_file:
errors.append(f"模板 {code} 缺少 source_file。")
elif source_dir.exists() and not (source_dir / source_file).exists():
errors.append(f"模板源文件不存在:{source_file}")
if not output_name:
errors.append(f"模板 {code} 缺少 output_name。")
return errors

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
import shutil
from pathlib import Path
from review_agent.regulatory_info_package.schemas import TemplateSpec
from review_agent.regulatory_info_package.storage import ensure_batch_subdir
from review_agent.models import RegulatoryInfoPackageBatch
def template_specs(config: dict) -> list[TemplateSpec]:
return [
TemplateSpec(
code=item["code"],
output_name=item["output_name"],
source_file=item["source_file"],
file_format=item.get("file_format", "docx"),
strategy=item.get("strategy", item["code"]),
include_in_zip=bool(item.get("include_in_zip", True)),
prefer_legacy_doc_native=bool(item.get("prefer_legacy_doc_native", False)),
allow_docx_fallback=bool(item.get("allow_docx_fallback", True)),
fields=item.get("fields") or [],
)
for item in config.get("templates") or []
]
def copy_template_to_batch(batch: RegulatoryInfoPackageBatch, config: dict, spec: TemplateSpec) -> Path:
source_dir = Path(config["source_dir"])
source = source_dir / spec.source_file
target = ensure_batch_subdir(batch, "templates") / f"{spec.code}.source{source.suffix}"
shutil.copy2(source, target)
return target

View File

@@ -0,0 +1,51 @@
from __future__ import annotations
import json
from pathlib import Path
from openpyxl import Workbook
from review_agent.regulatory_info_package.schemas import MergedField
HEADERS = [
"target_file",
"target_field",
"final_value",
"extraction_source",
"evidence",
"highlight_reason",
"needs_review",
]
def save_traceability_exports(root: str | Path, merged_fields: dict[str, MergedField]) -> tuple[Path, Path]:
root_path = Path(root)
exports_dir = root_path / "exports"
logs_dir = root_path / "logs"
exports_dir.mkdir(parents=True, exist_ok=True)
logs_dir.mkdir(parents=True, exist_ok=True)
rows = [
{
"target_file": "",
"target_field": field.label,
"final_value": field.value,
"extraction_source": field.source,
"evidence": field.evidence,
"highlight_reason": field.highlight_reason,
"needs_review": field.needs_review,
}
for field in merged_fields.values()
]
excel_path = exports_dir / "traceability.xlsx"
workbook = Workbook()
sheet = workbook.active
sheet.title = "traceability"
sheet.append(HEADERS)
for row in rows:
sheet.append([row.get(header, "") for header in HEADERS])
workbook.save(excel_path)
json_path = logs_dir / "traceability.json"
json_path.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8")
return excel_path, json_path

View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile
from review_agent.regulatory_info_package.constants import DEFAULT_ZIP_NAME, GENERATED_FILE_FALLBACK_SUCCESS, GENERATED_FILE_SUCCESS
from review_agent.regulatory_info_package.schemas import GeneratedFileResult
def create_zip_package(root: str | Path, generated_files: list[GeneratedFileResult], zip_name: str = DEFAULT_ZIP_NAME) -> Path:
root_path = Path(root)
exports_dir = root_path / "exports"
exports_dir.mkdir(parents=True, exist_ok=True)
zip_path = exports_dir / zip_name
allowed = {GENERATED_FILE_SUCCESS, GENERATED_FILE_FALLBACK_SUCCESS}
with ZipFile(zip_path, "w", compression=ZIP_DEFLATED) as archive:
for result in generated_files:
if result.status not in allowed or not result.path:
continue
file_path = Path(result.path)
if file_path.exists():
archive.write(file_path, arcname=result.file_name)
return zip_path