Files
DEMO-AGENT/review_agent/regulatory_review/services/rectification_review.py

78 lines
3.0 KiB
Python

from __future__ import annotations
import json
from django.utils import timezone
from review_agent.models import FileSummaryBatch, RegulatoryIssue, RegulatoryReviewBatch
from review_agent.regulatory_review.services.rule_loader import load_rule_file
from review_agent.regulatory_review.storage import save_artifact
def review_missing_issues(
*,
batch: RegulatoryReviewBatch,
issue_ids: list[int],
file_summary_batch: FileSummaryBatch,
) -> dict[str, object]:
rule_set = load_rule_file()
rules_by_code = {rule["code"]: rule for rule in rule_set.get("requirements", [])}
items = list(file_summary_batch.items.order_by("file_index"))
record = {
"type": "review_record",
"reviewed_at": timezone.localtime().isoformat(),
"source_review_batch_id": batch.pk,
"source_review_batch_no": batch.batch_no,
"file_summary_batch_id": file_summary_batch.pk,
"file_summary_batch_no": file_summary_batch.batch_no,
"items": [],
}
issues = RegulatoryIssue.objects.filter(batch=batch, pk__in=issue_ids).order_by("id")
for issue in issues:
rule = rules_by_code.get(issue.rule_code, {})
matched_files = _match_items(items, [*rule.get("file_keywords", []), issue.title])
passed = bool(matched_files)
issue.status = RegulatoryIssue.Status.REVIEW_PASSED if passed else RegulatoryIssue.Status.REVIEW_FAILED
issue.evidence = {
**(issue.evidence or {}),
"latest_review": {
"file_summary_batch_id": file_summary_batch.pk,
"file_summary_batch_no": file_summary_batch.batch_no,
"matched_files": matched_files,
},
}
issue.save(update_fields=["status", "evidence", "updated_at"])
record["items"].append(
{
"issue_id": issue.pk,
"rule_code": issue.rule_code,
"title": issue.title,
"status": issue.status,
"matched_files": matched_files,
}
)
artifact = save_artifact(
batch,
name=f"review_record_{timezone.now().strftime('%Y%m%d%H%M%S')}.json",
artifact_type="json",
content=json.dumps(record, ensure_ascii=False, indent=2),
metadata={"artifact": "review_record", "file_summary_batch_id": file_summary_batch.pk},
)
record["artifact_id"] = artifact.pk
return record
def _match_items(items, keywords: list[str]) -> list[dict[str, str]]:
normalized_keywords = [str(keyword).lower() for keyword in keywords if keyword]
matched = []
for item in items:
haystack = f"{item.file_name} {item.relative_path} {item.directory_level}".lower()
if any(keyword in haystack for keyword in normalized_keywords):
matched.append(
{
"file_name": item.file_name,
"relative_path": item.relative_path,
"directory_level": item.directory_level,
}
)
return matched