93 lines
3.8 KiB
Python
93 lines
3.8 KiB
Python
from __future__ import annotations
|
||
|
||
from collections.abc import Callable
|
||
|
||
from review_agent.regulatory_review.schemas import Finding
|
||
|
||
|
||
def run_structure_check(
|
||
document_texts: dict[str, str],
|
||
rule_set: dict,
|
||
progress_callback: Callable[[dict[str, object]], None] | None = None,
|
||
) -> list[Finding]:
|
||
findings: list[Finding] = []
|
||
combined_all_text = "\n".join(document_texts.values())
|
||
requirements = list(rule_set.get("requirements", []))
|
||
total = len(requirements)
|
||
for index, requirement in enumerate(requirements, start=1):
|
||
if requirement.get("structure_required") and not _contains_any(
|
||
combined_all_text,
|
||
[requirement.get("title", ""), *requirement.get("aliases", [])],
|
||
):
|
||
findings.append(
|
||
Finding(
|
||
rule_code=requirement["code"],
|
||
category="structure",
|
||
severity=requirement.get("severity", "medium"),
|
||
title=f"申报资料目录缺少{_numbered_title(requirement)}章节",
|
||
detail=f"未在申报资料目录或章节标题候选中发现{_numbered_title(requirement)}。",
|
||
suggestion=requirement.get("suggestion", ""),
|
||
evidence={
|
||
"attachment4_code": requirement.get("attachment4_code"),
|
||
"expected_title": requirement["title"],
|
||
"aliases": requirement.get("aliases", []),
|
||
},
|
||
)
|
||
)
|
||
required_sections = requirement.get("required_sections") or []
|
||
if required_sections:
|
||
matching_docs = _matching_documents(document_texts, requirement.get("file_keywords", []))
|
||
if matching_docs:
|
||
combined_text = "\n".join(matching_docs.values())
|
||
for section in required_sections:
|
||
if _contains_any(combined_text, [section]):
|
||
continue
|
||
findings.append(
|
||
Finding(
|
||
rule_code=f"{requirement['code']}:{section}",
|
||
category="structure",
|
||
severity=requirement.get("severity", "medium"),
|
||
title=f"{requirement['title']}缺少{section}章节",
|
||
detail=f"已匹配{requirement['title']}文件,但未发现{section}相关内容。",
|
||
suggestion=requirement.get("suggestion", ""),
|
||
evidence={"section": section, "files": list(matching_docs)},
|
||
)
|
||
)
|
||
if progress_callback:
|
||
progress_callback(
|
||
{
|
||
"processed": index,
|
||
"total": total,
|
||
"label": _numbered_title(requirement),
|
||
"finding_count": len(findings),
|
||
}
|
||
)
|
||
return findings
|
||
|
||
|
||
def _matching_documents(document_texts: dict[str, str], keywords: list[str]) -> dict[str, str]:
|
||
if not keywords:
|
||
return document_texts
|
||
result = {}
|
||
for name, text in document_texts.items():
|
||
haystack = f"{name}\n{text}".lower()
|
||
if any(str(keyword).lower() in haystack for keyword in keywords):
|
||
result[name] = text
|
||
return result
|
||
|
||
|
||
def _contains_any(text: str, needles: list[str]) -> bool:
|
||
normalized = _normalize_title(text)
|
||
return any(_normalize_title(needle) in normalized for needle in needles if needle)
|
||
|
||
|
||
def _normalize_title(value: str) -> str:
|
||
return "".join(str(value).lower().replace("/", "").replace("/", "").split())
|
||
|
||
|
||
def _numbered_title(requirement: dict) -> str:
|
||
attachment4_code = requirement.get("attachment4_code")
|
||
if not attachment4_code:
|
||
return requirement["title"]
|
||
return f"{attachment4_code}{requirement['title']}"
|